Skip to main content
Glama

HubSpot MCP Server

by peakmojo
faiss_manager.py11.7 kB
import os import logging import glob from datetime import datetime, timedelta import faiss import numpy as np import json from typing import Dict, List, Optional, Any, Tuple logger = logging.getLogger("mcp_hubspot_faiss_manager") class FaissManager: """Manager for FAISS indexes that handles rolling storage by day.""" def __init__(self, storage_dir: str = "/storage", max_days: int = 7, embedding_dimension: int = 384): """Initialize the FAISS manager. Args: storage_dir: Directory to store FAISS index files max_days: Maximum number of days to keep in storage embedding_dimension: Dimension of the embedding vectors (default: 384 for all-MiniLM-L6-v2) """ self.storage_dir = storage_dir self.max_days = max_days self.embedding_dimension = embedding_dimension self.indexes: Dict[str, faiss.Index] = {} self.metadata: Dict[str, List[Dict[str, Any]]] = {} # Ensure storage directory exists self._ensure_storage_dir() # Load existing indexes or create new ones self._initialize_indexes() def _ensure_storage_dir(self) -> None: """Create storage directory if it doesn't exist.""" if not os.path.exists(self.storage_dir): logger.info(f"Creating storage directory: {self.storage_dir}") os.makedirs(self.storage_dir, exist_ok=True) def _get_index_path(self, date_str: str) -> str: """Get the path for a specific index file. Args: date_str: Date string in YYYY-MM-DD format Returns: Path to the index file """ return os.path.join(self.storage_dir, f"index_{date_str}.faiss") def _get_metadata_path(self, date_str: str) -> str: """Get the path for a specific metadata file. Args: date_str: Date string in YYYY-MM-DD format Returns: Path to the metadata file """ return os.path.join(self.storage_dir, f"metadata_{date_str}.json") def _get_today_date_str(self) -> str: """Get today's date as a string in YYYY-MM-DD format.""" return datetime.now().strftime("%Y-%m-%d") def _initialize_indexes(self) -> None: """Initialize indexes by loading existing ones or creating new ones.""" # Get list of existing index files index_files = glob.glob(os.path.join(self.storage_dir, "index_*.faiss")) if index_files: logger.info(f"Found {len(index_files)} existing index files") # Extract dates from filenames and sort them dates = [] for file_path in index_files: filename = os.path.basename(file_path) # Extract date from filename (format: index_YYYY-MM-DD.faiss) date_str = filename[6:-6] # Remove "index_" prefix and ".faiss" suffix try: date = datetime.strptime(date_str, "%Y-%m-%d") dates.append((date, date_str)) except ValueError: logger.warning(f"Skipping invalid index filename: {filename}") # Sort dates in descending order (newest first) dates.sort(reverse=True) # Keep only the most recent max_days indexes recent_dates = dates[:self.max_days] # Remove older index files for date, date_str in dates[self.max_days:]: self._remove_index(date_str) # Load the recent indexes for _, date_str in recent_dates: self._load_index(date_str) # Create today's index if it doesn't exist today = self._get_today_date_str() if today not in self.indexes: self._create_new_index(today) def _load_index(self, date_str: str) -> None: """Load an index and its metadata from disk. Args: date_str: Date string in YYYY-MM-DD format """ index_path = self._get_index_path(date_str) metadata_path = self._get_metadata_path(date_str) try: # Load FAISS index index = faiss.read_index(index_path) # Load metadata if os.path.exists(metadata_path): with open(metadata_path, 'r') as f: metadata = json.load(f) else: metadata = [] # Store in memory self.indexes[date_str] = index self.metadata[date_str] = metadata logger.info(f"Loaded index for {date_str} with {index.ntotal} vectors") except Exception as e: logger.error(f"Failed to load index for {date_str}: {str(e)}") def _create_new_index(self, date_str: str) -> None: """Create a new empty index. Args: date_str: Date string in YYYY-MM-DD format """ # Create a new index using the configured embedding dimension dimension = self.embedding_dimension logger.debug(f"Creating new index with dimension {dimension}") index = faiss.IndexFlatL2(dimension) # Store in memory self.indexes[date_str] = index self.metadata[date_str] = [] logger.info(f"Created new empty index for {date_str} with dimension {dimension}") def _remove_index(self, date_str: str) -> None: """Remove an index and its metadata from disk and memory. Args: date_str: Date string in YYYY-MM-DD format """ index_path = self._get_index_path(date_str) metadata_path = self._get_metadata_path(date_str) # Remove files try: if os.path.exists(index_path): os.remove(index_path) if os.path.exists(metadata_path): os.remove(metadata_path) logger.info(f"Removed old index for {date_str}") except Exception as e: logger.error(f"Failed to remove index for {date_str}: {str(e)}") # Remove from memory self.indexes.pop(date_str, None) self.metadata.pop(date_str, None) def save_all_indexes(self) -> None: """Save all indexes and metadata to disk.""" for date_str in self.indexes: self._save_index(date_str) def save_today_index(self) -> None: """Save only today's index and metadata to disk.""" today = self._get_today_date_str() logger.debug(f"Attempting to save today's index ({today})") if today in self.indexes: index_size = self.indexes[today].ntotal metadata_count = len(self.metadata[today]) logger.debug(f"Today's index contains {index_size} vectors and {metadata_count} metadata items") self._save_index(today) logger.info(f"Saved today's index ({today}) with {index_size} vectors") else: logger.warning(f"Today's index ({today}) does not exist") def _save_index(self, date_str: str) -> None: """Save an index and its metadata to disk. Args: date_str: Date string in YYYY-MM-DD format """ if date_str not in self.indexes: logger.warning(f"Cannot save non-existent index for {date_str}") return index_path = self._get_index_path(date_str) metadata_path = self._get_metadata_path(date_str) try: logger.debug(f"Saving FAISS index for {date_str} to {index_path}") # Save FAISS index faiss.write_index(self.indexes[date_str], index_path) logger.debug(f"FAISS index saved successfully") logger.debug(f"Saving metadata for {date_str} to {metadata_path} ({len(self.metadata[date_str])} items)") # Save metadata with open(metadata_path, 'w') as f: json.dump(self.metadata[date_str], f) logger.debug(f"Metadata saved successfully") logger.info(f"Saved index for {date_str} with {self.indexes[date_str].ntotal} vectors") except Exception as e: logger.error(f"Failed to save index for {date_str}: {str(e)}", exc_info=True) def add_data(self, vectors: np.ndarray, metadata_list: List[Dict[str, Any]]) -> None: """Add data to today's index. Args: vectors: NumPy array of vectors to add metadata_list: List of metadata dictionaries """ today = self._get_today_date_str() logger.debug(f"Adding data to index for {today}") logger.debug(f"Vector data shape: {vectors.shape}, metadata list length: {len(metadata_list)}") # Create today's index if it doesn't exist if today not in self.indexes: logger.debug(f"Today's index ({today}) does not exist, creating new index") self._create_new_index(today) # Add vectors to the index logger.debug(f"Current index size before addition: {self.indexes[today].ntotal}") self.indexes[today].add(vectors) logger.debug(f"Current index size after addition: {self.indexes[today].ntotal}") # Add metadata current_metadata_count = len(self.metadata[today]) logger.debug(f"Current metadata count before addition: {current_metadata_count}") self.metadata[today].extend(metadata_list) logger.debug(f"Current metadata count after addition: {len(self.metadata[today])}") logger.debug(f"Saving index after data addition") # Save the updated index self._save_index(today) logger.info(f"Added {len(vectors)} vectors to index for {today}, new total: {self.indexes[today].ntotal}") def search(self, query_vector: np.ndarray, k: int = 10) -> Tuple[List[Dict[str, Any]], List[float]]: """Search across all indexes for the most similar vectors. Args: query_vector: Query vector k: Number of results to return Returns: Tuple of (metadata_list, distances) """ all_results = [] # Ensure query_vector is properly shaped (1 x dim) if len(query_vector.shape) == 1: query_vector = query_vector.reshape(1, -1) for date_str, index in self.indexes.items(): if index.ntotal == 0: continue # Search in this index distances, indices = index.search(query_vector, min(k, index.ntotal)) # Get metadata for results for i, idx in enumerate(indices[0]): if idx != -1: # -1 indicates no match found result = { "metadata": self.metadata[date_str][idx], "distance": float(distances[0][i]), "date": date_str } all_results.append(result) # Sort by distance (ascending) all_results.sort(key=lambda x: x["distance"]) # Return top k results top_results = all_results[:k] # Separate metadata and distances metadata_list = [result["metadata"] for result in top_results] distances = [result["distance"] for result in top_results] return metadata_list, distances

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/peakmojo/mcp-hubspot'

If you have feedback or need assistance with the MCP directory API, please join our Discord server