Skip to main content
Glama
mcp-qdrant-openai-wrapper.py16 kB
#!/usr/bin/env python3 """ MCP Server for Qdrant with OpenAI Embeddings This script implements a Model Context Protocol (MCP) server that provides semantic search capabilities using Qdrant vector database and OpenAI embeddings. Required environment variables: - OPENAI_API_KEY: Your OpenAI API key - QDRANT_URL: Qdrant server URL (default: http://localhost:6333) - COLLECTION_NAME: Qdrant collection name (default: kindash-codebase-openai) - OPENAI_EMBEDDING_MODEL: OpenAI embedding model (default: text-embedding-3-small) """ import os import sys import json import asyncio from typing import List, Dict, Any, Optional import logging # Configure logging with more detail for debugging log_level = os.environ.get('MCP_LOG_LEVEL', 'INFO') logging.basicConfig( level=getattr(logging, log_level.upper(), logging.INFO), format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler(sys.stderr) # Log to stderr to avoid interfering with stdout ] ) logger = logging.getLogger(__name__) try: from openai import OpenAI from qdrant_client import QdrantClient from qdrant_client.models import Distance, VectorParams, PointStruct, Filter, FieldCondition, MatchValue except ImportError as e: logger.error(f"Required package not installed: {e}") logger.error("Please install: pip install openai qdrant-client") sys.exit(1) # MCP Protocol implementation class MCPServer: """Minimal MCP server implementation for Qdrant operations""" def __init__(self): self.openai_api_key = os.environ.get("OPENAI_API_KEY") if not self.openai_api_key: raise ValueError("OPENAI_API_KEY environment variable is required") self.qdrant_url = os.environ.get("QDRANT_URL", "http://localhost:6333") self.collection_name = os.environ.get("COLLECTION_NAME", "kindash-codebase-openai") self.embedding_model = os.environ.get("OPENAI_EMBEDDING_MODEL", "text-embedding-3-small") # Initialize clients self.openai_client = OpenAI(api_key=self.openai_api_key) self.qdrant_client = QdrantClient(url=self.qdrant_url) # Verify collection exists self._ensure_collection() def _ensure_collection(self): """Ensure the Qdrant collection exists with correct configuration""" try: collections = self.qdrant_client.get_collections() collection_names = [c.name for c in collections.collections] if self.collection_name not in collection_names: logger.info(f"Creating collection: {self.collection_name}") self.qdrant_client.create_collection( collection_name=self.collection_name, vectors_config=VectorParams( size=1536, # text-embedding-3-small dimension distance=Distance.COSINE ) ) else: logger.info(f"Collection {self.collection_name} already exists") except Exception as e: logger.error(f"Failed to ensure collection: {e}") raise def get_embeddings(self, texts: List[str]) -> List[List[float]]: """Generate embeddings using OpenAI API""" try: response = self.openai_client.embeddings.create( model=self.embedding_model, input=texts ) return [data.embedding for data in response.data] except Exception as e: logger.error(f"Failed to generate embeddings: {e}") raise async def handle_request(self, request: Dict[str, Any]) -> Dict[str, Any]: """Handle incoming MCP requests""" method = request.get("method") params = request.get("params", {}) request_id = request.get("id") try: if method == "initialize": return self._handle_initialize(request_id) elif method == "tools/list": return self._handle_tools_list(request_id) elif method == "tools/call": return await self._handle_tool_call(params, request_id) else: return self._error_response(request_id, f"Unknown method: {method}") except Exception as e: logger.error(f"Error handling request: {e}") return self._error_response(request_id, str(e)) def _handle_initialize(self, request_id: Any) -> Dict[str, Any]: """Handle initialization request""" return { "jsonrpc": "2.0", "id": request_id, "result": { "protocolVersion": "2024-11-05", "capabilities": { "tools": {} }, "serverInfo": { "name": "qdrant-openai-mcp", "version": "1.0.0" } } } def _handle_tools_list(self, request_id: Any) -> Dict[str, Any]: """Return available tools""" return { "jsonrpc": "2.0", "id": request_id, "result": { "tools": [ { "name": "search", "description": "Search for code using semantic similarity", "inputSchema": { "type": "object", "properties": { "query": { "type": "string", "description": "Search query" }, "limit": { "type": "number", "description": "Maximum number of results", "default": 10 }, "filter": { "type": "object", "description": "Optional metadata filters" } }, "required": ["query"] } }, { "name": "store", "description": "Store code snippet with embeddings", "inputSchema": { "type": "object", "properties": { "content": { "type": "string", "description": "Content to store" }, "metadata": { "type": "object", "description": "Metadata for the content" }, "id": { "type": "string", "description": "Optional unique ID" } }, "required": ["content"] } }, { "name": "collection_info", "description": "Get information about the collection", "inputSchema": { "type": "object", "properties": {} } } ] } } async def _handle_tool_call(self, params: Dict[str, Any], request_id: Any) -> Dict[str, Any]: """Handle tool invocation""" tool_name = params.get("name") arguments = params.get("arguments", {}) if tool_name == "search": result = await self._search( query=arguments.get("query"), limit=arguments.get("limit", 10), filter_dict=arguments.get("filter") ) elif tool_name == "store": result = await self._store( content=arguments.get("content"), metadata=arguments.get("metadata", {}), point_id=arguments.get("id") ) elif tool_name == "collection_info": result = await self._get_collection_info() else: return self._error_response(request_id, f"Unknown tool: {tool_name}") return { "jsonrpc": "2.0", "id": request_id, "result": { "content": [ { "type": "text", "text": json.dumps(result, indent=2) } ] } } async def _search(self, query: str, limit: int = 10, filter_dict: Optional[Dict] = None) -> Dict[str, Any]: """Search for similar content""" try: # Generate embedding for query query_embedding = self.get_embeddings([query])[0] # Build filter if provided qdrant_filter = None if filter_dict: conditions = [] for key, value in filter_dict.items(): conditions.append(FieldCondition(key=key, match=MatchValue(value=value))) if conditions: qdrant_filter = Filter(must=conditions) # Search results = self.qdrant_client.search( collection_name=self.collection_name, query_vector=query_embedding, limit=limit, query_filter=qdrant_filter ) # Format results formatted_results = [] for result in results: formatted_results.append({ "id": str(result.id), "score": result.score, "metadata": result.payload }) return { "query": query, "count": len(formatted_results), "results": formatted_results } except Exception as e: logger.error(f"Search error: {e}") return {"error": str(e)} async def _store(self, content: str, metadata: Dict[str, Any], point_id: Optional[str] = None) -> Dict[str, Any]: """Store content with embeddings""" try: # Generate embedding embedding = self.get_embeddings([content])[0] # Generate ID if not provided if not point_id: import hashlib point_id = hashlib.md5(content.encode()).hexdigest() # Create point point = PointStruct( id=point_id, vector=embedding, payload=metadata ) # Upsert to Qdrant self.qdrant_client.upsert( collection_name=self.collection_name, points=[point] ) return { "id": point_id, "status": "stored", "metadata": metadata } except Exception as e: logger.error(f"Store error: {e}") return {"error": str(e)} async def _get_collection_info(self) -> Dict[str, Any]: """Get collection information""" try: info = self.qdrant_client.get_collection(self.collection_name) return { "name": self.collection_name, "vectors_count": info.vectors_count, "points_count": info.points_count, "config": { "vector_size": info.config.params.vectors.size, "distance": info.config.params.vectors.distance.value } } except Exception as e: logger.error(f"Collection info error: {e}") return {"error": str(e)} def _error_response(self, request_id: Any, message: str) -> Dict[str, Any]: """Create error response""" return { "jsonrpc": "2.0", "id": request_id, "error": { "code": -32603, "message": message } } async def main(): """Main entry point for the MCP server""" server = MCPServer() logger.info("MCP Qdrant OpenAI server started") # Use a thread pool executor to read from stdin without blocking the event loop import concurrent.futures executor = concurrent.futures.ThreadPoolExecutor(max_workers=1) loop = asyncio.get_event_loop() def read_line(): """Blocking read from stdin""" return sys.stdin.readline() while True: try: # Read line from stdin in a separate thread line = await loop.run_in_executor(executor, read_line) if not line: logger.info("EOF received, shutting down") break line = line.strip() if not line: continue # Parse JSON-RPC request try: request = json.loads(line) except json.JSONDecodeError as e: logger.error(f"Failed to parse JSON: {e}, line: {line}") error_response = { "jsonrpc": "2.0", "id": None, "error": { "code": -32700, "message": f"Parse error: {str(e)}" } } sys.stdout.write(json.dumps(error_response) + '\n') sys.stdout.flush() continue logger.debug(f"Received request: {request}") # Handle request response = await server.handle_request(request) # Write response response_str = json.dumps(response) sys.stdout.write(response_str + '\n') sys.stdout.flush() logger.debug(f"Sent response: {response_str}") except Exception as e: logger.error(f"Error in main loop: {e}", exc_info=True) error_response = { "jsonrpc": "2.0", "id": None, "error": { "code": -32603, "message": f"Internal error: {str(e)}" } } sys.stdout.write(json.dumps(error_response) + '\n') sys.stdout.flush() # Cleanup executor.shutdown(wait=True) if __name__ == "__main__": # Add a simple test mode if len(sys.argv) > 1 and sys.argv[1] == "--test": logger.info("Running in test mode") logger.info(f"Python version: {sys.version}") logger.info(f"OPENAI_API_KEY set: {'OPENAI_API_KEY' in os.environ}") logger.info(f"QDRANT_URL: {os.environ.get('QDRANT_URL', 'http://localhost:6333')}") logger.info(f"COLLECTION_NAME: {os.environ.get('COLLECTION_NAME', 'kindash-codebase-openai')}") try: server = MCPServer() logger.info("Server initialized successfully") sys.exit(0) except Exception as e: logger.error(f"Failed to initialize server: {e}") sys.exit(1) try: logger.info("Starting MCP Qdrant OpenAI server...") asyncio.run(main()) except KeyboardInterrupt: logger.info("Server stopped by user") except Exception as e: logger.error(f"Fatal error: {e}", exc_info=True) sys.exit(1)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/steiner385/qdrant-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server