Skip to main content
Glama

Claude Slack

message_store.pyโ€ข30.7 kB
#!/usr/bin/env python3 """ MessageStore: Unified storage abstraction that coordinates SQLite and Qdrant. This is the single entry point for all message storage and retrieval operations. """ import os import sys import math from typing import Dict, List, Optional, Any, Union from datetime import datetime # Add parent directory to path to import log_manager sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) try: from log_manager import get_logger except ImportError: import logging def get_logger(name, component=None): return logging.getLogger(name) from .sqlite_store import SQLiteStore from .qdrant_store import QdrantStore from ..ranking import RankingProfiles, RankingProfile from ..channels.manager import ChannelManager class MessageStore: """ Unified storage abstraction that coordinates between SQLite and Qdrant. Responsibilities: - Single transaction boundary for message storage - Coordinates SQLite (source of truth) and Qdrant (vector search) - Handles fallback logic when semantic search is unavailable - Ensures consistency between stores """ def __init__(self, db_path: str, qdrant_config: Optional[Dict] = None): """ Initialize the unified message store. Args: db_path: Path to SQLite database qdrant_config: Optional Qdrant configuration: { 'qdrant_path': str, # Local path 'qdrant_url': str, # Remote URL 'qdrant_api_key': str, # API key 'embedding_model': str # Model name } """ # Initialize logger self.logger = get_logger('MessageStore', component='manager') # Initialize SQLite store (always required) self.sqlite = SQLiteStore(db_path) self.logger.info(f"Initialized SQLite store at {db_path}") # Initialize Qdrant store (optional) self.qdrant = None if qdrant_config: try: self.qdrant = QdrantStore(**qdrant_config) self.logger.info("Qdrant store initialized - semantic search enabled") except Exception as e: self.logger.warning(f"Failed to initialize Qdrant: {e} - continuing with SQLite only") # Continue without semantic search else: self.logger.debug("No Qdrant config provided - using SQLite only") async def initialize(self): """Initialize both stores""" self.logger.info("Initializing MessageStore") await self.sqlite.initialize() self.logger.info("MessageStore initialization complete") async def close(self): """Close all connections""" self.logger.info("Closing MessageStore connections") await self.sqlite.close() if self.qdrant: self.qdrant.close() self.logger.info("All connections closed") def has_semantic_search(self) -> bool: """Check if semantic search is available""" return self.qdrant is not None # ============================================================================ # Message Storage # ============================================================================ async def send_message(self, channel_id: str, sender_id: str, sender_project_id: Optional[str], content: str, metadata: Optional[Dict] = None, thread_id: Optional[str] = None, auto_register_sender: bool = True) -> int: """ Store a message in both SQLite and Qdrant. This is the primary storage method that ensures consistency. SQLite is the source of truth; Qdrant indexing is best-effort. Args: channel_id: Target channel sender_id: Sender agent name sender_project_id: Sender's project ID content: Message content metadata: Optional nested metadata thread_id: Optional thread ID auto_register_sender: If sender is unregistered, will be automatically provisioned Returns: Message ID """ # Register sender if applicable if auto_register_sender: try: await self.sqlite.register_agent( name=sender_id, project_id=sender_project_id, status= 'online' ) self.logger.debug(f"Sender {sender_id} autoregistered in project {project_id}") except Exception as e: self.logger.error(f"Auto-register error: {e}") # Extract confidence from metadata if present confidence = None if metadata and isinstance(metadata, dict): confidence = metadata.get('confidence') # Ensure the channel exists self.logger.debug(f"Sending message to channel {channel_id} from {sender_id}") channel = await self.sqlite.get_channel(channel_id) if not channel: self.logger.error(f"Channel '{channel_id}' does not exist") raise ValueError(f"Channel '{channel_id}' does not exist") # Store in SQLite (source of truth) message_id = await self.sqlite.send_message( channel_id=channel_id, sender_id=sender_id, sender_project_id=sender_project_id, content=content, metadata=metadata, thread_id=thread_id ) self.logger.debug(f"Message {message_id} stored in SQLite") # Index in Qdrant for semantic search (best-effort) if self.qdrant: try: # Get the timestamp from the just-created message message = await self.sqlite.get_message(message_id) # Timestamp is now stored as Unix timestamp (float) timestamp = message['timestamp'] # Already a float/Unix timestamp await self.qdrant.index_message( message_id=message_id, content=content, channel_id=channel_id, sender_id=sender_id, timestamp=timestamp, # Pass as-is, will be handled by index_message metadata=metadata, confidence=confidence, sender_project_id=sender_project_id ) self.logger.debug(f"Message {message_id} indexed in Qdrant") except Exception as e: # Log but don't fail the message send self.logger.warning(f"Failed to index message {message_id} in Qdrant: {e}") self.logger.info(f"Message {message_id} sent successfully to {channel_id}") return message_id # ============================================================================ # Message Retrieval # ============================================================================ async def get_message(self, message_id: int, agent_name: Optional[str] = None, agent_project_id: Optional[str] = None) -> Optional[Dict]: """ Get a specific message by ID. Args: message_id: The message ID agent_name: Optional agent requesting (for permission check) agent_project_id: Optional agent's project ID Returns: Message dict or None if not found/not accessible """ self.logger.debug(f"Retrieving message {message_id}") return await self.sqlite.get_message(message_id) async def get_agent_messages(self, agent_name: str, agent_project_id: Optional[str] = None, channel_id: Optional[str] = None, limit: int = 100, since: Optional[datetime] = None) -> List[Dict]: """ Get messages visible to a specific agent (with permission checks). This method enforces SQLite-level permissions, only returning messages from channels the agent has access to. Args: agent_name: Agent requesting messages (for permissions) agent_project_id: Agent's project ID channel_id: Optional specific channel filter limit: Maximum messages since: Only messages after this timestamp Returns: List of message dictionaries visible to the agent """ # Normalize channel ID if channel_id: self.logger.debug(f"Normalizing channel ID: {channel_id}") channel_id = ChannelManager.normalize_channel_id( channel_id, project_id=agent_project_id ) # Delegate to SQLite with agent context for permission enforcement self.logger.debug(f"Getting messages for agent {agent_name} (project: {agent_project_id})") return await self.sqlite.get_messages( agent_name=agent_name, agent_project_id=agent_project_id, channel_id=channel_id, limit=limit, since=since ) async def get_messages(self, channel_ids: Optional[List[str]] = None, sender_ids: Optional[List[str]] = None, message_ids: Optional[List[int]] = None, limit: int = 100, since: Optional[datetime] = None) -> List[Dict]: """ Get messages without permission checks (administrative access). This method bypasses agent permissions and retrieves messages directly. Use this for system operations, data migration, or administrative tasks. Args: channel_ids: Optional list of channel IDs to filter sender_ids: Optional list of sender IDs to filter message_ids: Optional list of specific message IDs limit: Maximum messages since: Only messages after this timestamp Returns: List of message dictionaries (no permission filtering) """ # Use the new admin method that bypasses permissions self.logger.debug(f"Getting messages (admin mode) - channels: {channel_ids}, limit: {limit}") return await self.sqlite.get_messages_admin( channel_ids=channel_ids, sender_ids=sender_ids, message_ids=message_ids, limit=limit, since=since ) async def get_messages_by_ids(self, message_ids: List[int], agent_name: Optional[str] = None, agent_project_id: Optional[str] = None) -> List[Dict]: """ Get multiple messages by their IDs. Args: message_ids: List of message IDs to retrieve agent_name: Optional agent requesting (for permission check) agent_project_id: Optional agent's project ID Returns: List of message dictionaries """ self.logger.debug(f"Getting {len(message_ids)} messages by IDs") return await self.sqlite.get_messages_by_ids( message_ids=message_ids, agent_name=agent_name, agent_project_id=agent_project_id ) # ============================================================================ # Search Operations # ============================================================================ async def search_messages(self, query: Optional[str] = None, project_ids: Optional[List[str]] = None, channel_ids: Optional[List[str]] = None, sender_ids: Optional[List[str]] = None, metadata_filters: Optional[Dict[str, Any]] = None, min_confidence: Optional[float] = None, since: Optional[datetime] = None, until: Optional[datetime] = None, limit: int = 20, ranking_profile: Union[str, RankingProfile, Dict] = "balanced") -> List[Dict]: """ Search messages with optional semantic similarity. This method intelligently routes between: - Semantic search (Qdrant) when query is provided and available - Filter search (SQLite) for pure filtering operations - Hybrid approach combining both Args: query: Semantic search query (optional) project_ids: Filter by projects (None = all, [] = none, ["global"] = global only) channel_ids: Filter by specific channels (overrides project filtering) sender_ids: Filter by senders metadata_filters: Arbitrary nested metadata filters with MongoDB-style operators min_confidence: Minimum confidence threshold since: Only return messages after this timestamp until: Only return messages before this timestamp limit: Maximum results ranking_profile: Ranking profile for semantic search Returns: List of messages with scores (if semantic) """ # Handle project-based channel filtering if channel_ids is None and project_ids is not None: # If empty list, return no results if len(project_ids) == 0: self.logger.debug("Empty project_ids list, returning no results") return [] # Get channels for specified projects self.logger.debug(f"Getting channels for projects: {project_ids}") channel_ids = await self._get_channels_for_projects(project_ids) if not channel_ids: self.logger.debug(f"No channels found for projects: {project_ids}") return [] # No channels found for specified projects if query and self.qdrant: # Semantic search via Qdrant self.logger.info(f"Performing semantic search: query='{query[:50]}...', limit={limit}") results = await self._semantic_search( query=query, channel_ids=channel_ids, sender_ids=sender_ids, metadata_filters=metadata_filters, min_confidence=min_confidence, since=since, until=until, limit=limit, ranking_profile=ranking_profile ) else: # Filter-based search via SQLite self.logger.info(f"Performing filter-based search (no semantic): limit={limit}") results = await self._filter_search( channel_ids=channel_ids, sender_ids=sender_ids, metadata_filters=metadata_filters, min_confidence=min_confidence, since=since, until=until, limit=limit ) self.logger.debug(f"Search returned {len(results)} results") return results async def search_agent_messages(self, agent_name: str, agent_project_id: Optional[str] = None, query: Optional[str] = None, channel_ids: Optional[List[str]] = None, sender_ids: Optional[List[str]] = None, metadata_filters: Optional[Dict[str, Any]] = None, min_confidence: Optional[float] = None, since: Optional[datetime] = None, until: Optional[datetime] = None, limit: int = 20, ranking_profile: Union[str, RankingProfile, Dict] = "balanced") -> List[Dict]: """ Search messages with agent permission checks. This method ensures the agent can only search messages in channels they have access to. It first gets the agent's accessible channels, then performs the search within that scope. Args: agent_name: Agent performing the search (for permissions) agent_project_id: Agent's project ID query: Semantic search query (optional) channel_ids: Filter by specific channels (will be intersected with accessible channels) sender_ids: Filter by senders metadata_filters: Arbitrary nested metadata filters min_confidence: Minimum confidence threshold limit: Maximum results ranking_profile: Ranking profile for semantic search Returns: List of messages with scores (if semantic) that agent has permission to see """ # Normalize channel IDs _channel_ids = [] if channel_ids: for channel_id in channel_ids: _channel_id = ChannelManager.normalize_channel_id( channel_id, project_id=agent_project_id ) _channel_ids.append(_channel_id) # Get channels accessible to the agent self.logger.debug(f"Getting accessible channels for agent {agent_name}") accessible_channels = await self.sqlite.get_agent_channels( agent_name=agent_name, agent_project_id=agent_project_id, include_archived=False ) accessible_channel_ids = [ch['id'] for ch in accessible_channels] # If no accessible channels, return empty if not accessible_channel_ids: self.logger.debug(f"Agent {agent_name} has no accessible channels") return [] # If channel_ids specified, intersect with accessible channels if _channel_ids: # Only search in channels that are both specified AND accessible search_channel_ids = list(set(_channel_ids) & set(accessible_channel_ids)) if not search_channel_ids: self.logger.debug("No overlap between requested and accessible channels") return [] # No overlap between requested and accessible channels else: # Search all accessible channels search_channel_ids = accessible_channel_ids # Now perform the search with the permission-filtered channel list if query and self.qdrant: # Semantic search via Qdrant self.logger.info(f"Agent {agent_name} semantic search: query='{query[:50]}...', channels={len(search_channel_ids)}") results = await self._semantic_search( query=query, channel_ids=search_channel_ids, sender_ids=sender_ids, metadata_filters=metadata_filters, min_confidence=min_confidence, since=since, until=until, limit=limit, ranking_profile=ranking_profile ) else: # Filter-based search via SQLite self.logger.info(f"Agent {agent_name} filter search: channels={len(search_channel_ids)}") results = await self._filter_search_agent( agent_name=agent_name, agent_project_id=agent_project_id, channel_ids=search_channel_ids, sender_ids=sender_ids, metadata_filters=metadata_filters, min_confidence=min_confidence, since=since, until=until, limit=limit ) self.logger.debug(f"Agent search returned {len(results)} results for {agent_name}") return results async def _semantic_search(self, query: str, channel_ids: Optional[List[str]] = None, sender_ids: Optional[List[str]] = None, metadata_filters: Optional[Dict[str, Any]] = None, min_confidence: Optional[float] = None, since: Optional[datetime] = None, until: Optional[datetime] = None, limit: int = 20, ranking_profile: Union[str, RankingProfile, Dict] = "balanced") -> List[Dict]: """ Perform semantic search using Qdrant + SQLite. Strategy: 1. Search in Qdrant for semantically similar messages 2. Retrieve full message data from SQLite 3. Apply time decay and confidence ranking 4. Return ranked results """ # Get ranking profile if isinstance(ranking_profile, str): profile = RankingProfiles.get_profile(ranking_profile) elif isinstance(ranking_profile, dict): profile = RankingProfile(**ranking_profile) else: profile = ranking_profile # Search in Qdrant self.logger.debug(f"Querying Qdrant with ranking profile: {ranking_profile}") qdrant_results = await self.qdrant.search( query=query, channel_ids=channel_ids, sender_ids=sender_ids, metadata_filters=metadata_filters, min_confidence=min_confidence, since=since, until=until, limit=limit * 3 # Get extra for re-ranking ) if not qdrant_results: self.logger.debug("No results from Qdrant") return [] # Extract message IDs and scores message_scores = {} message_payloads = {} for msg_id, score, payload in qdrant_results: message_scores[msg_id] = score message_payloads[msg_id] = payload # Retrieve full messages from SQLite message_ids = list(message_scores.keys()) self.logger.debug(f"Retrieving {len(message_ids)} messages from SQLite") messages = await self.sqlite.get_messages_by_ids(message_ids) # Create ID to message mapping id_to_message = {msg['id']: msg for msg in messages} # Calculate final scores with time decay now = datetime.now() scored_results = [] for msg_id, similarity_score in message_scores.items(): if msg_id not in id_to_message: continue # Message might have been deleted msg = id_to_message[msg_id] payload = message_payloads[msg_id] # Parse timestamp - work with Unix timestamps directly from api.utils.time_utils import to_timestamp, now_timestamp timestamp = msg['timestamp'] msg_ts = to_timestamp(timestamp) # Handles both Unix and ISO formats now_ts = now_timestamp() # Calculate time decay using Unix timestamps (in hours) age_hours = max(0, (now_ts - msg_ts) / 3600) decay_score = self._calculate_decay(age_hours, profile.decay_half_life_hours) # Get confidence score confidence_score = payload.get('confidence', 0.5) # Calculate final score total_weight = (profile.similarity_weight + profile.confidence_weight + profile.decay_weight) if total_weight > 0: final_score = ( (similarity_score * profile.similarity_weight + confidence_score * profile.confidence_weight + decay_score * profile.decay_weight) / total_weight ) else: final_score = similarity_score # Add scoring details msg['search_scores'] = { 'final_score': final_score, 'similarity': similarity_score, 'confidence': confidence_score, 'recency': decay_score, 'age_hours': age_hours } scored_results.append((final_score, msg)) # Sort by final score and limit scored_results.sort(key=lambda x: x[0], reverse=True) final_results = [msg for _, msg in scored_results[:limit]] self.logger.debug(f"Semantic search returning {len(final_results)} results after ranking") return final_results async def _get_channels_for_projects(self, project_ids: List[str]) -> List[str]: """ Get all channel IDs for the specified project IDs. Args: project_ids: List of project IDs (can include "global") Returns: List of channel IDs belonging to those projects """ # Use the new efficient method self.logger.debug(f"Getting channels for {len(project_ids)} projects") channels = await self.sqlite.get_channels_by_projects(project_ids) self.logger.debug(f"Found {len(channels)} channels for specified projects") return [ch['id'] for ch in channels] async def _filter_search(self, channel_ids: Optional[List[str]] = None, sender_ids: Optional[List[str]] = None, metadata_filters: Optional[Dict[str, Any]] = None, min_confidence: Optional[float] = None, since: Optional[datetime] = None, until: Optional[datetime] = None, limit: int = 20) -> List[Dict]: """ Perform filter-based search using SQLite with MongoDB-style filtering. This is used when no query is provided or Qdrant is unavailable. """ # Use the advanced search method with MongoDB-style filters self.logger.debug(f"Performing advanced filter search with metadata_filters: {metadata_filters}") return await self.sqlite.search_messages_advanced( channel_ids=channel_ids, sender_ids=sender_ids, metadata_filters=metadata_filters, min_confidence=min_confidence, since=since, until=until, limit=limit ) async def _filter_search_agent(self, agent_name: str, agent_project_id: Optional[str] = None, channel_ids: Optional[List[str]] = None, sender_ids: Optional[List[str]] = None, metadata_filters: Optional[Dict[str, Any]] = None, min_confidence: Optional[float] = None, since: Optional[datetime] = None, until: Optional[datetime] = None, limit: int = 20) -> List[Dict]: """ Perform filter-based search with agent permissions using SQLite with MongoDB-style filtering. This is used when no semantic query is provided or Qdrant is unavailable, but we still need to respect agent permissions. """ # Use the advanced search method with agent context for permissions self.logger.debug(f"Performing advanced filter search for agent {agent_name}") return await self.sqlite.search_messages_advanced( agent_name=agent_name, agent_project_id=agent_project_id, channel_ids=channel_ids, sender_ids=sender_ids, metadata_filters=metadata_filters, min_confidence=min_confidence, since=since, until=until, limit=limit ) def _calculate_decay(self, age_hours: float, half_life_hours: float) -> float: """Calculate exponential decay score based on age""" if age_hours < 0: return 1.0 # Future messages get max score if half_life_hours <= 0: return 0.0 # Invalid half-life # Prevent overflow for very large ratios ratio = age_hours / half_life_hours if ratio > 100: # Message is >100 half-lives old return 0.0 return math.exp(-math.log(2) * ratio) # ============================================================================ # Magic Method: Pass-through to SQLite Store # ============================================================================ def __getattr__(self, name): """ Proxy missing methods to the SQLite store. This eliminates the need for dozens of boilerplate delegation methods. Any method not explicitly defined in MessageStore will be forwarded to SQLiteStore. Methods that MessageStore overrides (like send_message, search_messages) are handled directly and won't trigger this proxy. Examples of proxied methods: - Project management: register_project, get_project, list_projects - Agent management: register_agent, get_agent, get_discoverable_agents - Channel management: create_channel, get_channel, add_channel_member - Session management: register_session, get_session - Tool deduplication: record_tool_call, get_recent_tool_calls """ # Check if SQLiteStore has the requested method if hasattr(self.sqlite, name): return getattr(self.sqlite, name) # If not found, raise AttributeError raise AttributeError(f"'{self.__class__.__name__}' object has no attribute '{name}'")

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/theo-nash/claude-slack'

If you have feedback or need assistance with the MCP directory API, please join our Discord server