Skip to main content
Glama

MCP Server with LLM Integration

by MelaLitho
mcp_llm_integration.py17.3 kB
""" MCP Server Integration with Multi-Provider LLM Support Integrates the multi-provider LLM system with your MCP server """ import os import json from typing import Dict, Any, Optional, List, AsyncGenerator from llmintegrationsystem import ( LLMIntegrationSystem, LLMProvider as LLMProviderModel, ChatMessage, LLMResponse ) class MCPServerWithMultiLLM: """ Enhanced MCP Server with multi-provider LLM support Integrates with your existing MCP server and chat memory """ def __init__(self, app, config: Dict[str, Any]): # Existing MCP server initialization self.app = app self.config = config self.pool = None # PostgreSQL pool # Initialize LLM integration system self.llm = LLMIntegrationSystem() # Initialize chat memory (from your existing code) from mcp_chat_memory import ChatMemoryManager self.memory_manager = ChatMemoryManager(self.pool, config) async def generate_sql(self, question: str, schema_context: Dict[str, Any], provider: Optional[str] = None) -> str: """Generate SQL using LLM with specified provider""" prompt = f"""You are a PostgreSQL expert. Generate a safe SELECT query for the user's question. Database Schema: {json.dumps(schema_context, indent=2)} User Question: {question} Requirements: - Only SELECT statements (no INSERT, UPDATE, DELETE, DROP, etc.) - Use proper JOIN syntax - Include appropriate WHERE clauses - Add LIMIT clauses for large result sets (max 100 rows) - Use PostgreSQL-specific syntax - Return ONLY the SQL query without markdown formatting or explanations SQL Query:""" # Use default model if no provider specified model = "mistral:latest" if provider == "ollama" else "gpt-3.5-turbo" response = await self.llm.chat(prompt, model=model, provider=provider) # Clean SQL response sql = response.strip() if sql.startswith('```'): sql = sql.split('```')[1] if sql.startswith('sql'): sql = sql[3:] sql = sql.strip().rstrip(';') return sql async def summarize_results(self, question: str, data: List[Dict], provider: Optional[str] = None) -> str: """Summarize query results using LLM""" prompt = f"""Summarize these database query results to answer the user's question. Question: {question} Query Results: {json.dumps(data[:20], default=str, indent=2)} {f"... and {len(data)-20} more rows" if len(data) > 20 else ""} Provide a clear, concise summary in markdown format: - Start with a direct answer to the question - Include key statistics or insights - Use tables for structured data when appropriate - Keep the summary under 500 words""" # Use default model if no provider specified model = "mistral:latest" if provider == "ollama" else "gpt-3.5-turbo" return await self.llm.chat(prompt, model=model, provider=provider) async def process_query_stream(self, question: str, session_id: str, provider: Optional[str] = None) -> AsyncGenerator[str, None]: """Process query with streaming response""" # Add to memory if self.memory_manager: await self.memory_manager.add_message(session_id, 'user', question) context = await self.memory_manager.get_context(session_id) else: context = [] # Get schema context schema_context = await self._get_schema_context(question) # Stream SQL generation yield json.dumps({ 'type': 'status', 'content': 'Generating SQL query...' }) + '\n' sql_prompt = self._build_sql_prompt(question, schema_context, context) sql_query = "" model = "mistral:latest" if provider == "ollama" else "gpt-3.5-turbo" # Use streaming only if provider is ollama, otherwise use regular chat if provider == "ollama": async for chunk in self.llm.chat_stream(sql_prompt, model=model, provider=provider): content = chunk.get('content', '') sql_query += content yield json.dumps({ 'type': 'sql_generation', 'content': content }) + '\n' else: # For non-ollama providers, use regular chat sql_query = await self.llm.chat(sql_prompt, model=model, provider=provider) yield json.dumps({ 'type': 'sql_generation', 'content': sql_query }) + '\n' # Clean SQL sql_query = self._clean_sql(sql_query) # Execute SQL yield json.dumps({ 'type': 'status', 'content': 'Executing query...' }) + '\n' result = await self.execute_sql_query(sql_query) if result.get('success'): yield json.dumps({ 'type': 'query_result', 'content': result }) + '\n' # Stream summary yield json.dumps({ 'type': 'status', 'content': 'Analyzing results...' }) + '\n' summary_prompt = self._build_summary_prompt(question, result['data'], context) summary = "" # Use streaming only if provider is ollama, otherwise use regular chat if provider == "ollama": async for chunk in self.llm.chat_stream(summary_prompt, model=model, provider=provider): content = chunk.get('content', '') summary += content yield json.dumps({ 'type': 'summary', 'content': content }) + '\n' else: # For non-ollama providers, use regular chat summary = await self.llm.chat(summary_prompt, model=model, provider=provider) yield json.dumps({ 'type': 'summary', 'content': summary }) + '\n' # Store in memory if self.memory_manager: await self.memory_manager.add_message(session_id, 'assistant', summary) else: error_msg = result.get('error', 'Query execution failed') yield json.dumps({ 'type': 'error', 'content': error_msg }) + '\n' async def generate_embedding(self, text: str, provider: Optional[str] = None) -> List[float]: """Generate embedding using configured provider - Not implemented in current LLM system""" raise NotImplementedError("Embedding generation not available in current LLM integration system") async def handle_mcp_request_with_llm(self, request_data: Dict[str, Any]) -> Dict[str, Any]: """Handle MCP request with LLM provider selection""" method = request_data.get('method') params = request_data.get('params', {}) # Extract provider preference from params provider = params.pop('llm_provider', None) if method == 'llm/providers': # Return list of available providers return { 'providers': [ {'name': name, 'models': provider.models, 'default_model': provider.default_model} for name, provider in self.llm.providers.items() ] } elif method == 'llm/complete': # Direct completion prompt = params.get('prompt') model = params.get('model', 'mistral:latest' if provider == 'ollama' else 'gpt-3.5-turbo') response = await self.llm.chat(prompt, model=model, provider=provider) return {'response': response} elif method == 'llm/chat': # Chat completion messages = params.get('messages', []) # Convert messages to conversation history format if needed conversation_history = [] if messages: conversation_history = [ChatMessage(role=msg['role'], content=msg['content']) for msg in messages[:-1]] current_message = messages[-1]['content'] else: current_message = "" model = params.get('model', 'mistral:latest' if provider == 'ollama' else 'gpt-3.5-turbo') response = await self.llm.chat(current_message, model=model, provider=provider, conversation_history=conversation_history) return {'response': response} elif method == 'llm/embed': # Generate embedding - not implemented return {'error': 'Embedding generation not available in current LLM integration system'} # Handle other MCP methods... return {'error': f'Unknown method: {method}'} def _build_sql_prompt(self, question: str, schema_context: Dict, conversation_context: List) -> str: """Build SQL generation prompt with context""" prompt_parts = [] # Add conversation context if available if conversation_context: recent_context = "\n".join([ f"{msg.role}: {msg.content[:200]}" for msg in conversation_context[-3:] ]) prompt_parts.append(f"Recent conversation:\n{recent_context}\n") prompt_parts.extend([ "You are a PostgreSQL expert. Generate a safe SELECT query.", f"\nDatabase Schema:\n{json.dumps(schema_context, indent=2)}", f"\nUser Question: {question}", "\nRequirements:", "- Only SELECT statements", "- Proper PostgreSQL syntax", "- Include LIMIT clause (max 100)", "- Return ONLY the SQL query", "\nSQL Query:" ]) return "\n".join(prompt_parts) def _build_summary_prompt(self, question: str, data: List[Dict], conversation_context: List) -> str: """Build summary prompt with context""" prompt_parts = [] if conversation_context: prompt_parts.append("Consider the conversation context when summarizing.\n") prompt_parts.extend([ f"Question: {question}\n", f"Query returned {len(data)} rows.\n", f"Data sample:\n{json.dumps(data[:10], default=str, indent=2)}\n", "Provide a concise markdown summary that directly answers the question." ]) return "\n".join(prompt_parts) def _clean_sql(self, sql: str) -> str: """Clean SQL response from LLM""" sql = sql.strip() # Remove markdown code blocks if '```' in sql: parts = sql.split('```') for part in parts: if 'SELECT' in part.upper(): sql = part break # Remove sql language identifier if sql.lower().startswith('sql'): sql = sql[3:].strip() # Remove trailing semicolon for safety sql = sql.rstrip(';').strip() return sql async def cleanup(self): """Cleanup resources""" await self.llm.close() if self.pool: await self.pool.close() # Configuration template def create_mcp_config() -> Dict[str, Any]: """Create MCP configuration with LLM providers""" return { # Database configuration 'db_host': os.getenv('DB_HOST', 'localhost'), 'db_port': int(os.getenv('DB_PORT', 5432)), 'db_name': os.getenv('DB_NAME', 'your_database'), 'db_user': os.getenv('DB_USER', 'your_user'), 'db_password': os.getenv('DB_PASSWORD', 'your_password'), # Ollama configuration (primary/default) 'ollama': { 'model': os.getenv('OLLAMA_MODEL', 'llama3.2'), 'base_url': os.getenv('OLLAMA_URL', 'http://localhost:11434'), 'temperature': float(os.getenv('OLLAMA_TEMP', 0.7)), 'max_tokens': int(os.getenv('OLLAMA_MAX_TOKENS', 4096)), 'use_for_embeddings': True }, # Specialized Ollama models 'ollama_models': { 'ollama-code': { 'model': 'codellama', 'temperature': 0.1, 'extra_params': { 'num_ctx': 8192, # Larger context for code 'repeat_penalty': 1.1 } }, 'ollama-sql': { 'model': 'sqlcoder', # If you have it 'temperature': 0.1, 'extra_params': { 'stop': [';', '\n\n'] } }, 'ollama-embed': { 'model': 'nomic-embed-text', 'temperature': 0.0 } }, # OpenAI configuration (optional) 'openai': { 'api_key': os.getenv('OPENAI_API_KEY'), 'model': os.getenv('OPENAI_MODEL', 'gpt-4-turbo-preview'), 'temperature': float(os.getenv('OPENAI_TEMP', 0.7)), 'max_tokens': int(os.getenv('OPENAI_MAX_TOKENS', 4096)) }, # Anthropic configuration (optional) 'anthropic': { 'api_key': os.getenv('ANTHROPIC_API_KEY'), 'model': os.getenv('CLAUDE_MODEL', 'claude-3-opus-20240229'), 'temperature': float(os.getenv('CLAUDE_TEMP', 0.7)), 'max_tokens': int(os.getenv('CLAUDE_MAX_TOKENS', 4096)) }, # Google configuration (optional) 'google': { 'api_key': os.getenv('GOOGLE_API_KEY'), 'model': os.getenv('GEMINI_MODEL', 'gemini-1.5-pro'), 'temperature': float(os.getenv('GEMINI_TEMP', 0.7)), 'max_tokens': int(os.getenv('GEMINI_MAX_TOKENS', 4096)) }, # Memory configuration 'max_context_tokens': int(os.getenv('MAX_CONTEXT_TOKENS', 8000)), 'short_term_window': int(os.getenv('SHORT_TERM_WINDOW', 20)), 'summarization_interval': int(os.getenv('SUMMARIZATION_INTERVAL', 10)) } # Flask app integration from flask import Flask, request, jsonify, Response import json def create_app(): """Create Flask app with multi-LLM MCP server""" app = Flask(__name__) # Create configuration config = create_mcp_config() # Initialize MCP server with multi-LLM support mcp_server = MCPServerWithMultiLLM(app, config) @app.route('/mcp', methods=['POST']) async def mcp_endpoint(): """Main MCP endpoint""" data = request.get_json() response = await mcp_server.handle_mcp_request_with_llm(data) return jsonify(response) @app.route('/query/stream', methods=['POST']) def stream_query(): """Streaming query endpoint""" data = request.get_json() question = data.get('question') session_id = data.get('session_id', str(uuid4())) provider = data.get('provider') # Optional provider selection def generate(): import asyncio loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) async def async_gen(): async for chunk in mcp_server.process_query_stream( question, session_id, provider ): yield chunk for item in loop.run_until_complete(async_gen()): yield item return Response( generate(), mimetype='text/event-stream', headers={ 'Cache-Control': 'no-cache', 'X-Accel-Buffering': 'no' } ) @app.route('/llm/providers', methods=['GET']) def list_providers(): """List available LLM providers""" return jsonify({ 'providers': [ mcp_server.llm.get_provider_info(name) for name in mcp_server.llm.providers.keys() ], 'default': mcp_server.llm.default_provider, 'default_embedding': mcp_server.llm.default_embedding_provider }) @app.route('/llm/test', methods=['POST']) async def test_provider(): """Test a specific LLM provider""" data = request.get_json() provider = data.get('provider') prompt = data.get('prompt', 'Hello, please respond with "OK" if you are working.') try: response = await mcp_server.llm.complete(prompt, provider=provider) return jsonify({ 'success': True, 'provider': provider, 'response': response }) except Exception as e: return jsonify({ 'success': False, 'provider': provider, 'error': str(e) }), 500 return app

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/MelaLitho/MCPServer'

If you have feedback or need assistance with the MCP directory API, please join our Discord server