Skip to main content
Glama
operations.py15.2 kB
# Copyright 2024 Heinrich Krupp # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ Core operations for code execution interface. Provides token-efficient functions for memory operations: - search: Semantic search with compact results - store: Store new memories with minimal parameters - health: Service health and status check Token Efficiency: - search(5 results): ~385 tokens (vs ~2,625, 85% reduction) - store(): ~15 tokens (vs ~150, 90% reduction) - health(): ~20 tokens (vs ~125, 84% reduction) Performance: - Cold call: ~50ms (storage initialization) - Warm call: ~5-10ms (connection reused) - Memory overhead: <10MB """ import logging import time from typing import Optional, Union, List from .types import ( CompactMemory, CompactSearchResult, CompactHealthInfo, CompactConsolidationResult, CompactSchedulerStatus ) from .client import get_storage_async, get_consolidator, get_scheduler from .sync_wrapper import sync_wrapper from ..models.memory import Memory from ..utils.hashing import generate_content_hash logger = logging.getLogger(__name__) @sync_wrapper async def search( query: str, limit: int = 5, tags: Optional[List[str]] = None ) -> CompactSearchResult: """ Search memories using semantic similarity. Token efficiency: ~25 tokens (query + params) + ~73 tokens per result Example (5 results): ~385 tokens vs ~2,625 tokens (85% reduction) Args: query: Search query text (natural language) limit: Maximum number of results to return (default: 5) tags: Optional list of tags to filter results Returns: CompactSearchResult with minimal memory representations Raises: RuntimeError: If storage backend is not available ValueError: If query is empty or limit is invalid Example: >>> from mcp_memory_service.api import search >>> results = search("recent architecture changes", limit=3) >>> print(results) SearchResult(found=3, shown=3) >>> for m in results.memories: ... print(f"{m.hash}: {m.preview[:50]}...") abc12345: Implemented OAuth 2.1 authentication for... def67890: Refactored storage backend to support... ghi11121: Added hybrid mode for Cloudflare sync... Performance: - First call: ~50ms (includes storage initialization) - Subsequent calls: ~5-10ms (connection reused) - Scales linearly with limit (5ms + 1ms per result) """ # Validate input if not query or not query.strip(): raise ValueError("Query cannot be empty") if limit < 1: raise ValueError("Limit must be at least 1") if limit > 100: logger.warning(f"Large limit ({limit}) may impact performance") # Get storage instance storage = await get_storage_async() # Perform semantic search query_results = await storage.retrieve(query, n_results=limit) # Filter by tags if specified if tags: tag_set = set(tags) query_results = [ r for r in query_results if any(tag in tag_set for tag in r.memory.tags) ] # Convert to compact format compact_memories = tuple( CompactMemory( hash=r.memory.content_hash[:8], # 8-char hash preview=r.memory.content[:200], # First 200 chars tags=tuple(r.memory.tags), # Immutable tuple created=r.memory.created_at, # Unix timestamp score=r.relevance_score # Relevance score ) for r in query_results ) return CompactSearchResult( memories=compact_memories, total=len(compact_memories), query=query ) @sync_wrapper async def store( content: str, tags: Optional[Union[str, List[str]]] = None, memory_type: str = "note" ) -> str: """ Store a new memory. Token efficiency: ~15 tokens (params only) vs ~150 tokens for MCP tool call with schema (90% reduction) Args: content: Memory content text tags: Single tag or list of tags (optional) memory_type: Memory type classification (default: "note") Returns: 8-character content hash of stored memory Raises: RuntimeError: If storage operation fails ValueError: If content is empty Example: >>> from mcp_memory_service.api import store >>> hash = store( ... "Implemented OAuth 2.1 authentication", ... tags=["authentication", "security", "feature"] ... ) >>> print(f"Stored: {hash}") Stored: abc12345 Performance: - First call: ~50ms (includes storage initialization) - Subsequent calls: ~10-20ms (includes embedding generation) - Scales with content length (20ms + 0.5ms per 100 chars) """ # Validate input if not content or not content.strip(): raise ValueError("Content cannot be empty") # Normalize tags to list if tags is None: tag_list = [] elif isinstance(tags, str): tag_list = [tags] else: tag_list = list(tags) # Generate content hash content_hash = generate_content_hash(content) # Create memory object memory = Memory( content=content, content_hash=content_hash, tags=tag_list, memory_type=memory_type, metadata={} ) # Get storage instance storage = await get_storage_async() # Store memory success, message = await storage.store(memory) if not success: raise RuntimeError(f"Failed to store memory: {message}") # Return short hash (8 chars) return content_hash[:8] @sync_wrapper async def health() -> CompactHealthInfo: """ Get service health and status. Token efficiency: ~20 tokens vs ~125 tokens for MCP health check tool (84% reduction) Returns: CompactHealthInfo with backend, count, and ready status Raises: RuntimeError: If unable to retrieve health information Example: >>> from mcp_memory_service.api import health >>> info = health() >>> print(f"Status: {info.status}") Status: healthy >>> print(f"Backend: {info.backend}, Count: {info.count}") Backend: sqlite_vec, Count: 1247 Performance: - First call: ~50ms (includes storage initialization) - Subsequent calls: ~5ms (cached stats) """ try: # Get storage instance storage = await get_storage_async() # Get storage statistics stats = await storage.get_stats() # Determine status status = "healthy" if stats.get("status") == "degraded": status = "degraded" elif stats.get("status") == "error": status = "error" elif not stats.get("initialized", True): status = "error" # Extract backend type backend = stats.get("storage_backend", "unknown") # Extract memory count count = stats.get("total_memories", 0) return CompactHealthInfo( status=status, count=count, backend=backend ) except Exception as e: logger.error(f"Health check failed: {e}") return CompactHealthInfo( status="error", count=0, backend="unknown" ) async def _consolidate_async(time_horizon: str) -> CompactConsolidationResult: """ Internal async implementation of consolidation. This function contains the core consolidation logic and is used by both the sync-wrapped API function and the FastAPI endpoint to avoid duplication. """ # Validate time horizon valid_horizons = ['daily', 'weekly', 'monthly', 'quarterly', 'yearly'] if time_horizon not in valid_horizons: raise ValueError( f"Invalid time_horizon: {time_horizon}. " f"Must be one of: {', '.join(valid_horizons)}" ) # Get consolidator instance consolidator = get_consolidator() if consolidator is None: raise RuntimeError( "Consolidator not available. " "Consolidation requires HTTP server with MCP_CONSOLIDATION_ENABLED=true. " "Start the HTTP server first." ) try: # Record start time start_time = time.time() # Run consolidation logger.info(f"Running {time_horizon} consolidation...") result = await consolidator.consolidate(time_horizon) # Calculate duration duration = time.time() - start_time # Extract metrics from result (ConsolidationReport object) processed = result.memories_processed compressed = result.memories_compressed forgotten = result.memories_archived status = 'completed' if not result.errors else 'completed_with_errors' logger.info( f"🎉 Consolidation completed successfully! Processed: {processed}, Compressed: {compressed}, Forgotten: {forgotten} (Total time: {duration:.1f}s)" ) return CompactConsolidationResult( status=status, horizon=time_horizon, processed=processed, compressed=compressed, forgotten=forgotten, duration=duration ) except Exception as e: logger.error(f"Consolidation failed: {e}") return CompactConsolidationResult( status="failed", horizon=time_horizon, processed=0, compressed=0, forgotten=0, duration=0.0 ) @sync_wrapper async def consolidate(time_horizon: str = "weekly") -> CompactConsolidationResult: """ Trigger memory consolidation for a specific time horizon. Token efficiency: ~40 tokens (result only) vs ~250 tokens for MCP consolidation result (84% reduction) Args: time_horizon: Time horizon for consolidation ('daily' | 'weekly' | 'monthly' | 'quarterly' | 'yearly') Returns: CompactConsolidationResult with operation metrics Raises: RuntimeError: If consolidation fails or consolidator not available ValueError: If time_horizon is invalid Example: >>> from mcp_memory_service.api import consolidate >>> result = consolidate('weekly') >>> print(result) Consolidation(completed, weekly, 2418 processed) >>> print(f"Compressed: {result.compressed}, Forgotten: {result.forgotten}") Compressed: 156, Forgotten: 43 Performance: - Typical duration: 10-30 seconds (depends on memory count) - Scales linearly with total memories (~10ms per memory) - Background operation (non-blocking in HTTP server context) Note: Requires HTTP server with consolidation enabled. If called when HTTP server is not running, will raise RuntimeError. """ return await _consolidate_async(time_horizon) async def _scheduler_status_async() -> CompactSchedulerStatus: """ Internal async implementation of scheduler status. This function contains the core status logic and is used by both the sync-wrapped API function and the FastAPI endpoint to avoid duplication. """ # Get scheduler instance scheduler = get_scheduler() if scheduler is None: logger.warning("Scheduler not available") return CompactSchedulerStatus( running=False, next_daily=None, next_weekly=None, next_monthly=None, jobs_executed=0, jobs_failed=0 ) try: # Get scheduler status if hasattr(scheduler, 'scheduler') and scheduler.scheduler is not None: # Scheduler is running jobs = scheduler.scheduler.get_jobs() # Extract next run times for each horizon next_daily = None next_weekly = None next_monthly = None for job in jobs: if job.next_run_time: timestamp = job.next_run_time.timestamp() if 'daily' in job.id.lower(): next_daily = timestamp elif 'weekly' in job.id.lower(): next_weekly = timestamp elif 'monthly' in job.id.lower(): next_monthly = timestamp # Get execution statistics jobs_executed = scheduler.execution_stats.get('successful_jobs', 0) jobs_failed = scheduler.execution_stats.get('failed_jobs', 0) return CompactSchedulerStatus( running=True, next_daily=next_daily, next_weekly=next_weekly, next_monthly=next_monthly, jobs_executed=jobs_executed, jobs_failed=jobs_failed ) else: # Scheduler exists but not running return CompactSchedulerStatus( running=False, next_daily=None, next_weekly=None, next_monthly=None, jobs_executed=0, jobs_failed=0 ) except Exception as e: logger.error(f"Failed to get scheduler status: {e}") return CompactSchedulerStatus( running=False, next_daily=None, next_weekly=None, next_monthly=None, jobs_executed=0, jobs_failed=0 ) @sync_wrapper async def scheduler_status() -> CompactSchedulerStatus: """ Get consolidation scheduler status and next run times. Token efficiency: ~25 tokens vs ~150 tokens for MCP scheduler_status tool (83% reduction) Returns: CompactSchedulerStatus with scheduler state and job statistics Raises: RuntimeError: If scheduler not available Example: >>> from mcp_memory_service.api import scheduler_status >>> status = scheduler_status() >>> print(status) Scheduler(running, executed=42, failed=0) >>> if status.next_daily: ... from datetime import datetime ... next_run = datetime.fromtimestamp(status.next_daily) ... print(f"Next daily: {next_run}") Performance: - Execution time: <5ms (reads cached state) - No storage access required - Lightweight status query Note: Requires HTTP server with consolidation scheduler enabled. Returns STOPPED status if scheduler not running. """ return await _scheduler_status_async()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/doobidoo/mcp-memory-service'

If you have feedback or need assistance with the MCP directory API, please join our Discord server