Skip to main content
Glama
bitgeese

Sequential Questioning MCP Server

by bitgeese
vector_db.py9.05 kB
from typing import List, Dict, Any, Optional, Union import numpy as np from qdrant_client import QdrantClient from qdrant_client.http import models from langchain_openai import OpenAIEmbeddings import os import time import httpx import uuid from app.core.config import get_settings from app.core.logging import app_logger from app.core.monitoring import measure_time settings = get_settings() class VectorDBService: """Service for interacting with Qdrant vector database.""" _instance = None def __new__(cls): """Singleton pattern to ensure only one vector db service instance.""" if cls._instance is None: cls._instance = super(VectorDBService, cls).__new__(cls) cls._instance._initialize() return cls._instance def _initialize(self): """Initialize Qdrant client and OpenAI embeddings.""" # Get the Qdrant URL from environment or use host:port if not available qdrant_url = os.getenv("QDRANT_URL", None) if qdrant_url: # Initialize with URL app_logger.info(f"Connecting to Qdrant using URL: {qdrant_url}") self.client = QdrantClient(url=qdrant_url) else: # Initialize with host and port app_logger.info(f"Connecting to Qdrant using Host: {settings.QDRANT_HOST}, Port: {settings.QDRANT_PORT}") self.client = QdrantClient( host=settings.QDRANT_HOST, port=settings.QDRANT_PORT ) self.collection_name = settings.QDRANT_COLLECTION_NAME self.embeddings = OpenAIEmbeddings( api_key=settings.OPENAI_API_KEY ) # Try to ensure collection exists with retry self._ensure_collection_exists_with_retry() app_logger.info(f"Vector DB service initialized with collection '{self.collection_name}'") def _ensure_collection_exists_with_retry(self, max_retries=5, retry_delay=2): """Create the collection if it doesn't exist, with retry logic.""" retries = 0 while retries < max_retries: try: self._ensure_collection_exists() return except (httpx.ConnectError, httpx.ConnectTimeout) as e: retries += 1 app_logger.warning(f"Failed to connect to Qdrant (attempt {retries}/{max_retries}): {str(e)}") if retries < max_retries: app_logger.info(f"Retrying in {retry_delay} seconds...") time.sleep(retry_delay) retry_delay *= 1.5 # Exponential backoff else: app_logger.error(f"Failed to connect to Qdrant after {max_retries} attempts.") # Continue without vector DB functionality app_logger.warning("Application will continue without vector database functionality.") break def _ensure_collection_exists(self): """Create the collection if it doesn't exist.""" try: collections = self.client.get_collections().collections collection_names = [collection.name for collection in collections] if self.collection_name not in collection_names: self.client.create_collection( collection_name=self.collection_name, vectors_config=models.VectorParams( size=1536, # OpenAI embeddings dimension distance=models.Distance.COSINE ) ) app_logger.info(f"Created new collection '{self.collection_name}' in Qdrant") except Exception as e: app_logger.error(f"Error ensuring collection exists: {str(e)}") raise @measure_time async def store_embedding( self, text: str, metadata: Dict[str, Any], id: Optional[str] = None ) -> str: """Store a text embedding in the vector database. Args: text: The text to embed and store metadata: Associated metadata for the embedding id: Optional custom ID for the point Returns: ID of the stored point """ try: # Generate embedding embedding = await self._get_embedding(text) # Use provided ID or generate a proper UUID # Always ensure the ID is a UUID string format regardless of input if id is not None: # Try to convert the ID to UUID if it's not already try: # If it's a numeric ID or other non-UUID format, create a new UUID if not (id.startswith('{') and id.endswith('}') or '-' in id): point_id = str(uuid.uuid4()) else: # If it looks like a UUID, ensure it's properly formatted point_id = str(uuid.UUID(id)) except (ValueError, AttributeError, TypeError): # If conversion fails, generate a new UUID point_id = str(uuid.uuid4()) else: # Generate a new UUID if none provided point_id = str(uuid.uuid4()) # Upsert the point self.client.upsert( collection_name=self.collection_name, points=[ models.PointStruct( id=point_id, vector=embedding, payload=metadata ) ] ) app_logger.debug(f"Stored embedding with ID {point_id} in collection '{self.collection_name}'") return point_id except Exception as e: app_logger.error(f"Error storing embedding: {str(e)}") raw_response = getattr(e, "response", None) if raw_response: app_logger.error(f"Raw response content:\n{raw_response.content}") # Continue execution even if embedding storage fails # Return a fallback ID if we can't store the embedding return id if id is not None else f"fallback-{str(uuid.uuid4())}" @measure_time async def search_similar( self, query: str, filter_params: Optional[Dict[str, Any]] = None, limit: int = 5 ) -> List[Dict[str, Any]]: """Search for similar embeddings in the vector database. Args: query: The query text to search for filter_params: Optional filter parameters for search limit: Maximum number of results to return Returns: List of search results with score and payload """ try: # Generate embedding for query query_embedding = await self._get_embedding(query) # Prepare search filter search_filter = None if filter_params: search_filter = models.Filter( must=[ models.FieldCondition( key=key, match=models.MatchValue(value=value) ) for key, value in filter_params.items() ] ) # Perform search search_results = self.client.search( collection_name=self.collection_name, query_vector=query_embedding, limit=limit, query_filter=search_filter ) # Format results results = [] for result in search_results: results.append({ "id": result.id, "score": result.score, "payload": result.payload }) app_logger.debug(f"Found {len(results)} similar embeddings for query in '{self.collection_name}'") return results except Exception as e: app_logger.error(f"Error searching similar embeddings: {str(e)}") # Return empty results if search fails return [] async def _get_embedding(self, text: str) -> List[float]: """Get embedding for text using OpenAI. Args: text: The text to embed Returns: Embedding vector as list of floats """ try: return self.embeddings.embed_query(text) except Exception as e: app_logger.error(f"Error generating embedding: {str(e)}") # Return a zero embedding as fallback return [0.0] * 1536 # OpenAI embeddings are 1536-dimensional # Create vector db service instance vector_db_service = VectorDBService()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/bitgeese/sequential-questioning'

If you have feedback or need assistance with the MCP directory API, please join our Discord server