server.py•176 kB
# Copyright 2024 Heinrich Krupp
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
MCP Memory Service
Copyright (c) 2024 Heinrich Krupp
Licensed under the MIT License. See LICENSE file in the project root for full license text.
"""
import sys
import os
import socket
import time
import logging
import psutil
# Client detection for environment-aware behavior
def detect_mcp_client():
"""Detect which MCP client is running this server."""
try:
# Get the parent process (the MCP client)
current_process = psutil.Process()
parent = current_process.parent()
if parent:
parent_name = parent.name().lower()
parent_exe = parent.exe() if hasattr(parent, 'exe') else ""
# Check for Claude Desktop
if 'claude' in parent_name or 'claude' in parent_exe.lower():
return 'claude_desktop'
# Check for LM Studio
if 'lmstudio' in parent_name or 'lm-studio' in parent_name or 'lmstudio' in parent_exe.lower():
return 'lm_studio'
# Check command line for additional clues
try:
cmdline = parent.cmdline()
cmdline_str = ' '.join(cmdline).lower()
if 'claude' in cmdline_str:
return 'claude_desktop'
if 'lmstudio' in cmdline_str or 'lm-studio' in cmdline_str:
return 'lm_studio'
except (OSError, IndexError, AttributeError) as e:
logger.debug(f"Could not detect client from process: {e}")
pass
# Fallback: check environment variables
if os.getenv('CLAUDE_DESKTOP'):
return 'claude_desktop'
if os.getenv('LM_STUDIO'):
return 'lm_studio'
# Default to Claude Desktop for strict JSON compliance
return 'claude_desktop'
except Exception:
# If detection fails, default to Claude Desktop (strict mode)
return 'claude_desktop'
# Detect the current MCP client
MCP_CLIENT = detect_mcp_client()
# Custom logging handler that routes INFO/DEBUG to stdout, WARNING/ERROR to stderr
class DualStreamHandler(logging.Handler):
"""Client-aware handler that adjusts logging behavior based on MCP client."""
def __init__(self, client_type='claude_desktop'):
super().__init__()
self.client_type = client_type
self.stdout_handler = logging.StreamHandler(sys.stdout)
self.stderr_handler = logging.StreamHandler(sys.stderr)
# Set the same formatter for both handlers
formatter = logging.Formatter('%(levelname)s:%(name)s:%(message)s')
self.stdout_handler.setFormatter(formatter)
self.stderr_handler.setFormatter(formatter)
def emit(self, record):
"""Route log records based on client type and level."""
# For Claude Desktop: strict JSON mode - suppress most output, route everything to stderr
if self.client_type == 'claude_desktop':
# Only emit WARNING and above to stderr to maintain JSON protocol
if record.levelno >= logging.WARNING:
self.stderr_handler.emit(record)
# Suppress INFO/DEBUG for Claude Desktop to prevent JSON parsing errors
return
# For LM Studio: enhanced mode with dual-stream
if record.levelno >= logging.WARNING: # WARNING, ERROR, CRITICAL
self.stderr_handler.emit(record)
else: # DEBUG, INFO
self.stdout_handler.emit(record)
# Configure logging with client-aware handler BEFORE any imports that use logging
log_level = os.getenv('LOG_LEVEL', 'WARNING').upper() # Default to WARNING for performance
root_logger = logging.getLogger()
root_logger.setLevel(getattr(logging, log_level, logging.WARNING))
# Remove any existing handlers to avoid duplicates
for handler in root_logger.handlers[:]:
root_logger.removeHandler(handler)
# Add our custom client-aware handler
client_aware_handler = DualStreamHandler(client_type=MCP_CLIENT)
root_logger.addHandler(client_aware_handler)
logger = logging.getLogger(__name__)
# Enhanced path detection for Claude Desktop compatibility
def setup_python_paths():
"""Setup Python paths for dependency access."""
current_dir = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
# Check for virtual environment
potential_venv_paths = [
os.path.join(current_dir, 'venv', 'Lib', 'site-packages'), # Windows venv
os.path.join(current_dir, 'venv', 'lib', 'python3.11', 'site-packages'), # Linux/Mac venv
os.path.join(current_dir, '.venv', 'Lib', 'site-packages'), # Windows .venv
os.path.join(current_dir, '.venv', 'lib', 'python3.11', 'site-packages'), # Linux/Mac .venv
]
for venv_path in potential_venv_paths:
if os.path.exists(venv_path):
sys.path.insert(0, venv_path)
logger.debug(f"Added venv path: {venv_path}")
break
# For Claude Desktop: also check if we can access global site-packages
try:
import site
global_paths = site.getsitepackages()
user_path = site.getusersitepackages()
# Add user site-packages if not blocked by PYTHONNOUSERSITE
if not os.environ.get('PYTHONNOUSERSITE') and user_path not in sys.path:
sys.path.append(user_path)
logger.debug(f"Added user site-packages: {user_path}")
# Add global site-packages if available
for path in global_paths:
if path not in sys.path:
sys.path.append(path)
logger.debug(f"Added global site-packages: {path}")
except Exception as e:
logger.warning(f"Could not access site-packages: {e}")
# Setup paths before other imports
setup_python_paths()
import asyncio
import traceback
import json
import platform
from collections import deque
from typing import List, Dict, Any, Optional, Tuple
from datetime import datetime, timedelta
from mcp.server.models import InitializationOptions
import mcp.types as types
from mcp.server import NotificationOptions, Server
import mcp.server.stdio
from mcp.types import Resource, Prompt
from . import __version__
from .lm_studio_compat import patch_mcp_for_lm_studio, add_windows_timeout_handling
from .dependency_check import run_dependency_check, get_recommended_timeout
from .config import (
BACKUPS_PATH,
SERVER_NAME,
SERVER_VERSION,
STORAGE_BACKEND,
SQLITE_VEC_PATH,
CONSOLIDATION_ENABLED,
CONSOLIDATION_CONFIG,
CONSOLIDATION_SCHEDULE,
INCLUDE_HOSTNAME,
# Cloudflare configuration
CLOUDFLARE_API_TOKEN,
CLOUDFLARE_ACCOUNT_ID,
CLOUDFLARE_VECTORIZE_INDEX,
CLOUDFLARE_D1_DATABASE_ID,
CLOUDFLARE_R2_BUCKET,
CLOUDFLARE_EMBEDDING_MODEL,
CLOUDFLARE_LARGE_CONTENT_THRESHOLD,
CLOUDFLARE_MAX_RETRIES,
CLOUDFLARE_BASE_DELAY,
# Hybrid backend configuration
HYBRID_SYNC_INTERVAL,
HYBRID_BATCH_SIZE,
HYBRID_SYNC_ON_STARTUP
)
# Storage imports will be done conditionally in the server class
from .models.memory import Memory
from .utils.hashing import generate_content_hash
from .utils.system_detection import (
get_system_info,
print_system_diagnostics,
AcceleratorType
)
from .utils.time_parser import extract_time_expression, parse_time_expression
# Consolidation system imports (conditional)
if CONSOLIDATION_ENABLED:
from .consolidation.base import ConsolidationConfig
from .consolidation.consolidator import DreamInspiredConsolidator
from .consolidation.scheduler import ConsolidationScheduler
# Note: Logging is already configured at the top of the file with dual-stream handler
# Configure performance-critical module logging
if not os.getenv('DEBUG_MODE'):
# Set higher log levels for performance-critical modules
for module_name in ['sentence_transformers', 'transformers', 'torch', 'numpy']:
logging.getLogger(module_name).setLevel(logging.WARNING)
# Check if UV is being used
def check_uv_environment():
"""Check if UV is being used and provide recommendations if not."""
running_with_uv = 'UV_ACTIVE' in os.environ or any('uv' in arg.lower() for arg in sys.argv)
if not running_with_uv:
logger.info("Memory server is running without UV. For better performance and dependency management, consider using UV:")
logger.info(" pip install uv")
logger.info(" uv run memory")
else:
logger.info("Memory server is running with UV")
return running_with_uv
# Configure environment variables based on detected system
def configure_environment():
"""Configure environment variables based on detected system."""
system_info = get_system_info()
# Log system information
logger.info(f"Detected system: {system_info.os_name} {system_info.architecture}")
logger.info(f"Memory: {system_info.memory_gb:.2f} GB")
logger.info(f"Accelerator: {system_info.accelerator}")
# Set environment variables for better cross-platform compatibility
os.environ["PYTORCH_ENABLE_MPS_FALLBACK"] = "1"
# For Apple Silicon, ensure we use MPS when available
if system_info.architecture == "arm64" and system_info.os_name == "darwin":
logger.info("Configuring for Apple Silicon")
os.environ["PYTORCH_MPS_HIGH_WATERMARK_RATIO"] = "0.0"
# For Windows with limited GPU memory, use smaller chunks
if system_info.os_name == "windows" and system_info.accelerator == AcceleratorType.CUDA:
logger.info("Configuring for Windows with CUDA")
os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "max_split_size_mb:128"
# For Linux with ROCm, ensure we use the right backend
if system_info.os_name == "linux" and system_info.accelerator == AcceleratorType.ROCm:
logger.info("Configuring for Linux with ROCm")
os.environ["HSA_OVERRIDE_GFX_VERSION"] = "10.3.0"
# For systems with limited memory, reduce cache sizes
if system_info.memory_gb < 8:
logger.info("Configuring for low-memory system")
# Use BACKUPS_PATH parent directory for model caches
cache_base = os.path.dirname(BACKUPS_PATH)
os.environ["TRANSFORMERS_CACHE"] = os.path.join(cache_base, "model_cache")
os.environ["HF_HOME"] = os.path.join(cache_base, "hf_cache")
os.environ["SENTENCE_TRANSFORMERS_HOME"] = os.path.join(cache_base, "st_cache")
# Configure environment before any imports that might use it
configure_environment()
# Performance optimization environment variables
def configure_performance_environment():
"""Configure environment variables for optimal performance."""
# PyTorch optimizations
os.environ["PYTORCH_ENABLE_MPS_FALLBACK"] = "1"
os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "max_split_size_mb:128,garbage_collection_threshold:0.6"
# CPU optimizations
os.environ["OMP_NUM_THREADS"] = str(min(8, os.cpu_count() or 1))
os.environ["MKL_NUM_THREADS"] = str(min(8, os.cpu_count() or 1))
# Disable unnecessary features for performance
os.environ["TOKENIZERS_PARALLELISM"] = "false"
os.environ["TRANSFORMERS_NO_ADVISORY_WARNINGS"] = "1"
# Async CUDA operations
os.environ["CUDA_LAUNCH_BLOCKING"] = "0"
# Apply performance optimizations
configure_performance_environment()
class MemoryServer:
def __init__(self):
"""Initialize the server with hardware-aware configuration."""
self.server = Server(SERVER_NAME)
self.system_info = get_system_info()
# Initialize query time tracking
self.query_times = deque(maxlen=50) # Keep last 50 query times for averaging
# Initialize progress tracking
self.current_progress = {} # Track ongoing operations
# Initialize consolidation system (if enabled)
self.consolidator = None
self.consolidation_scheduler = None
if CONSOLIDATION_ENABLED:
try:
config = ConsolidationConfig(**CONSOLIDATION_CONFIG)
self.consolidator = None # Will be initialized after storage
self.consolidation_scheduler = None # Will be initialized after consolidator
logger.info("Consolidation system will be initialized after storage")
except Exception as e:
logger.error(f"Failed to initialize consolidation config: {e}")
self.consolidator = None
self.consolidation_scheduler = None
try:
# Initialize paths
logger.info(f"Creating directories if they don't exist...")
os.makedirs(BACKUPS_PATH, exist_ok=True)
# Log system diagnostics
logger.info(f"Initializing on {platform.system()} {platform.machine()} with Python {platform.python_version()}")
logger.info(f"Using accelerator: {self.system_info.accelerator}")
# DEFER STORAGE INITIALIZATION - Initialize storage lazily when needed
# This prevents hanging during server startup due to embedding model loading
logger.info(f"Deferring {STORAGE_BACKEND} storage initialization to prevent hanging")
if MCP_CLIENT == 'lm_studio':
print(f"Deferring {STORAGE_BACKEND} storage initialization to prevent startup hanging", file=sys.stdout, flush=True)
self.storage = None
self._storage_initialized = False
except Exception as e:
logger.error(f"Initialization error: {str(e)}")
logger.error(traceback.format_exc())
# Set storage to None to prevent any hanging
self.storage = None
self._storage_initialized = False
# Register handlers
self.register_handlers()
logger.info("Server initialization complete")
# Test handler registration with proper arguments
try:
logger.info("Testing handler registration...")
capabilities = self.server.get_capabilities(
notification_options=NotificationOptions(),
experimental_capabilities={}
)
logger.info(f"Server capabilities: {capabilities}")
if MCP_CLIENT == 'lm_studio':
print(f"Server capabilities registered successfully!", file=sys.stdout, flush=True)
except Exception as e:
logger.error(f"Handler registration test failed: {str(e)}")
print(f"Handler registration issue: {str(e)}", file=sys.stderr, flush=True)
def record_query_time(self, query_time_ms: float):
"""Record a query time for averaging."""
self.query_times.append(query_time_ms)
logger.debug(f"Recorded query time: {query_time_ms:.2f}ms")
def get_average_query_time(self) -> float:
"""Get the average query time from recent operations."""
if not self.query_times:
return 0.0
avg = sum(self.query_times) / len(self.query_times)
logger.debug(f"Average query time: {avg:.2f}ms (from {len(self.query_times)} samples)")
return round(avg, 2)
async def send_progress_notification(self, operation_id: str, progress: float, message: str = None):
"""Send a progress notification for a long-running operation."""
try:
# Store progress for potential querying
self.current_progress[operation_id] = {
"progress": progress,
"message": message or f"Operation {operation_id}: {progress:.0f}% complete",
"timestamp": datetime.now().isoformat()
}
# Send notification if server supports it
if hasattr(self.server, 'send_progress_notification'):
await self.server.send_progress_notification(
progress=progress,
progress_token=operation_id,
message=message
)
logger.debug(f"Progress {operation_id}: {progress:.0f}% - {message}")
# Clean up completed operations
if progress >= 100:
self.current_progress.pop(operation_id, None)
except Exception as e:
logger.debug(f"Could not send progress notification: {e}")
def get_operation_progress(self, operation_id: str) -> Optional[Dict[str, Any]]:
"""Get the current progress of an operation."""
return self.current_progress.get(operation_id)
async def _initialize_storage_with_timeout(self):
"""Initialize storage with timeout and caching optimization."""
try:
logger.info(f"🚀 EAGER INIT: Starting {STORAGE_BACKEND} storage initialization...")
logger.info(f"🔧 EAGER INIT: Environment check - STORAGE_BACKEND={STORAGE_BACKEND}")
# Log all Cloudflare config values for debugging
if STORAGE_BACKEND == 'cloudflare':
logger.info(f"🔧 EAGER INIT: Cloudflare config validation:")
logger.info(f" API_TOKEN: {'SET' if CLOUDFLARE_API_TOKEN else 'NOT SET'}")
logger.info(f" ACCOUNT_ID: {CLOUDFLARE_ACCOUNT_ID}")
logger.info(f" VECTORIZE_INDEX: {CLOUDFLARE_VECTORIZE_INDEX}")
logger.info(f" D1_DATABASE_ID: {CLOUDFLARE_D1_DATABASE_ID}")
logger.info(f" R2_BUCKET: {CLOUDFLARE_R2_BUCKET}")
logger.info(f" EMBEDDING_MODEL: {CLOUDFLARE_EMBEDDING_MODEL}")
if STORAGE_BACKEND == 'sqlite_vec':
# Check for multi-client coordination mode
from .utils.port_detection import ServerCoordinator
coordinator = ServerCoordinator()
coordination_mode = await coordinator.detect_mode()
logger.info(f"🔧 EAGER INIT: SQLite-vec - detected coordination mode: {coordination_mode}")
if coordination_mode == "http_client":
# Use HTTP client to connect to existing server
from .storage.http_client import HTTPClientStorage
self.storage = HTTPClientStorage()
logger.info(f"✅ EAGER INIT: Using HTTP client storage")
elif coordination_mode == "http_server":
# Try to auto-start HTTP server for coordination
from .utils.http_server_manager import auto_start_http_server_if_needed
server_started = await auto_start_http_server_if_needed()
if server_started:
# Wait a moment for the server to be ready, then use HTTP client
await asyncio.sleep(2)
from .storage.http_client import HTTPClientStorage
self.storage = HTTPClientStorage()
logger.info(f"✅ EAGER INIT: Started HTTP server and using HTTP client storage")
else:
# Fall back to direct SQLite-vec storage
from . import storage
import importlib
storage_module = importlib.import_module('mcp_memory_service.storage.sqlite_vec')
SqliteVecMemoryStorage = storage_module.SqliteVecMemoryStorage
self.storage = SqliteVecMemoryStorage(SQLITE_VEC_PATH)
logger.info(f"✅ EAGER INIT: HTTP server auto-start failed, using direct SQLite-vec storage")
else:
# Import sqlite-vec storage module (supports dynamic class replacement)
from . import storage
import importlib
storage_module = importlib.import_module('mcp_memory_service.storage.sqlite_vec')
SqliteVecMemoryStorage = storage_module.SqliteVecMemoryStorage
self.storage = SqliteVecMemoryStorage(SQLITE_VEC_PATH)
logger.info(f"✅ EAGER INIT: Using direct SQLite-vec storage at {SQLITE_VEC_PATH}")
elif STORAGE_BACKEND == 'cloudflare':
# Initialize Cloudflare storage
logger.info(f"☁️ EAGER INIT: Importing CloudflareStorage...")
from .storage.cloudflare import CloudflareStorage
logger.info(f"☁️ EAGER INIT: Creating CloudflareStorage instance...")
self.storage = CloudflareStorage(
api_token=CLOUDFLARE_API_TOKEN,
account_id=CLOUDFLARE_ACCOUNT_ID,
vectorize_index=CLOUDFLARE_VECTORIZE_INDEX,
d1_database_id=CLOUDFLARE_D1_DATABASE_ID,
r2_bucket=CLOUDFLARE_R2_BUCKET,
embedding_model=CLOUDFLARE_EMBEDDING_MODEL,
large_content_threshold=CLOUDFLARE_LARGE_CONTENT_THRESHOLD,
max_retries=CLOUDFLARE_MAX_RETRIES,
base_delay=CLOUDFLARE_BASE_DELAY
)
logger.info(f"✅ EAGER INIT: CloudflareStorage instance created with index: {CLOUDFLARE_VECTORIZE_INDEX}")
elif STORAGE_BACKEND == 'hybrid':
# Initialize Hybrid storage (SQLite-vec + Cloudflare)
logger.info(f"🔄 EAGER INIT: Using Hybrid storage...")
from .storage.hybrid import HybridMemoryStorage
# Prepare Cloudflare configuration dict
cloudflare_config = None
if all([CLOUDFLARE_API_TOKEN, CLOUDFLARE_ACCOUNT_ID, CLOUDFLARE_VECTORIZE_INDEX, CLOUDFLARE_D1_DATABASE_ID]):
cloudflare_config = {
'api_token': CLOUDFLARE_API_TOKEN,
'account_id': CLOUDFLARE_ACCOUNT_ID,
'vectorize_index': CLOUDFLARE_VECTORIZE_INDEX,
'd1_database_id': CLOUDFLARE_D1_DATABASE_ID,
'r2_bucket': CLOUDFLARE_R2_BUCKET,
'embedding_model': CLOUDFLARE_EMBEDDING_MODEL,
'large_content_threshold': CLOUDFLARE_LARGE_CONTENT_THRESHOLD,
'max_retries': CLOUDFLARE_MAX_RETRIES,
'base_delay': CLOUDFLARE_BASE_DELAY
}
logger.info(f"🔄 EAGER INIT: Cloudflare config prepared for hybrid storage")
else:
logger.warning("🔄 EAGER INIT: Incomplete Cloudflare config, hybrid will run in SQLite-only mode")
self.storage = HybridMemoryStorage(
sqlite_db_path=SQLITE_VEC_PATH,
embedding_model="all-MiniLM-L6-v2",
cloudflare_config=cloudflare_config,
sync_interval=HYBRID_SYNC_INTERVAL or 300,
batch_size=HYBRID_BATCH_SIZE or 50
)
logger.info(f"✅ EAGER INIT: HybridMemoryStorage instance created")
else:
# Unknown backend - should not reach here due to factory validation
logger.error(f"❌ EAGER INIT: Unknown storage backend: {STORAGE_BACKEND}")
raise ValueError(f"Unsupported storage backend: {STORAGE_BACKEND}")
# Initialize the storage backend
logger.info(f"🔧 EAGER INIT: Calling storage.initialize()...")
await self.storage.initialize()
logger.info(f"✅ EAGER INIT: storage.initialize() completed successfully")
self._storage_initialized = True
logger.info(f"🎉 EAGER INIT: {STORAGE_BACKEND} storage initialization successful")
# Verify storage type
storage_type = self.storage.__class__.__name__
logger.info(f"🔍 EAGER INIT: Final storage type verification: {storage_type}")
# Initialize consolidation system after storage is ready
await self._initialize_consolidation()
return True
except Exception as e:
logger.error(f"❌ EAGER INIT: Storage initialization failed: {str(e)}")
logger.error(f"📋 EAGER INIT: Full traceback:")
logger.error(traceback.format_exc())
return False
async def _ensure_storage_initialized(self):
"""Lazily initialize storage backend when needed."""
if not self._storage_initialized:
try:
logger.info(f"🔄 LAZY INIT: Starting {STORAGE_BACKEND} storage initialization...")
logger.info(f"🔧 LAZY INIT: Environment check - STORAGE_BACKEND={STORAGE_BACKEND}")
# Log all Cloudflare config values for debugging
if STORAGE_BACKEND == 'cloudflare':
logger.info(f"🔧 LAZY INIT: Cloudflare config validation:")
logger.info(f" API_TOKEN: {'SET' if CLOUDFLARE_API_TOKEN else 'NOT SET'}")
logger.info(f" ACCOUNT_ID: {CLOUDFLARE_ACCOUNT_ID}")
logger.info(f" VECTORIZE_INDEX: {CLOUDFLARE_VECTORIZE_INDEX}")
logger.info(f" D1_DATABASE_ID: {CLOUDFLARE_D1_DATABASE_ID}")
logger.info(f" R2_BUCKET: {CLOUDFLARE_R2_BUCKET}")
logger.info(f" EMBEDDING_MODEL: {CLOUDFLARE_EMBEDDING_MODEL}")
if STORAGE_BACKEND == 'sqlite_vec':
# Check for multi-client coordination mode
from .utils.port_detection import ServerCoordinator
coordinator = ServerCoordinator()
coordination_mode = await coordinator.detect_mode()
logger.info(f"🔧 LAZY INIT: SQLite-vec - detected coordination mode: {coordination_mode}")
if coordination_mode == "http_client":
# Use HTTP client to connect to existing server
from .storage.http_client import HTTPClientStorage
self.storage = HTTPClientStorage()
logger.info(f"✅ LAZY INIT: Using HTTP client storage")
elif coordination_mode == "http_server":
# Try to auto-start HTTP server for coordination
from .utils.http_server_manager import auto_start_http_server_if_needed
server_started = await auto_start_http_server_if_needed()
if server_started:
# Wait a moment for the server to be ready, then use HTTP client
await asyncio.sleep(2)
from .storage.http_client import HTTPClientStorage
self.storage = HTTPClientStorage()
logger.info(f"✅ LAZY INIT: Started HTTP server and using HTTP client storage")
else:
# Fall back to direct SQLite-vec storage
import importlib
storage_module = importlib.import_module('mcp_memory_service.storage.sqlite_vec')
SqliteVecMemoryStorage = storage_module.SqliteVecMemoryStorage
self.storage = SqliteVecMemoryStorage(SQLITE_VEC_PATH)
logger.info(f"✅ LAZY INIT: HTTP server auto-start failed, using direct SQLite-vec storage at: {SQLITE_VEC_PATH}")
else:
# Use direct SQLite-vec storage (with WAL mode for concurrent access)
import importlib
storage_module = importlib.import_module('mcp_memory_service.storage.sqlite_vec')
SqliteVecMemoryStorage = storage_module.SqliteVecMemoryStorage
self.storage = SqliteVecMemoryStorage(SQLITE_VEC_PATH)
logger.info(f"✅ LAZY INIT: Created SQLite-vec storage at: {SQLITE_VEC_PATH}")
elif STORAGE_BACKEND == 'cloudflare':
# Cloudflare backend using Vectorize, D1, and R2
logger.info(f"☁️ LAZY INIT: Importing CloudflareStorage...")
from .storage.cloudflare import CloudflareStorage
logger.info(f"☁️ LAZY INIT: Creating CloudflareStorage instance...")
self.storage = CloudflareStorage(
api_token=CLOUDFLARE_API_TOKEN,
account_id=CLOUDFLARE_ACCOUNT_ID,
vectorize_index=CLOUDFLARE_VECTORIZE_INDEX,
d1_database_id=CLOUDFLARE_D1_DATABASE_ID,
r2_bucket=CLOUDFLARE_R2_BUCKET,
embedding_model=CLOUDFLARE_EMBEDDING_MODEL,
large_content_threshold=CLOUDFLARE_LARGE_CONTENT_THRESHOLD,
max_retries=CLOUDFLARE_MAX_RETRIES,
base_delay=CLOUDFLARE_BASE_DELAY
)
logger.info(f"✅ LAZY INIT: Created Cloudflare storage with Vectorize index: {CLOUDFLARE_VECTORIZE_INDEX}")
elif STORAGE_BACKEND == 'hybrid':
# Hybrid backend using SQLite-vec as primary and Cloudflare as secondary
logger.info(f"🔄 LAZY INIT: Importing HybridMemoryStorage...")
from .storage.hybrid import HybridMemoryStorage
# Prepare Cloudflare configuration dict
cloudflare_config = None
if all([CLOUDFLARE_API_TOKEN, CLOUDFLARE_ACCOUNT_ID, CLOUDFLARE_VECTORIZE_INDEX, CLOUDFLARE_D1_DATABASE_ID]):
cloudflare_config = {
'api_token': CLOUDFLARE_API_TOKEN,
'account_id': CLOUDFLARE_ACCOUNT_ID,
'vectorize_index': CLOUDFLARE_VECTORIZE_INDEX,
'd1_database_id': CLOUDFLARE_D1_DATABASE_ID,
'r2_bucket': CLOUDFLARE_R2_BUCKET,
'embedding_model': CLOUDFLARE_EMBEDDING_MODEL,
'large_content_threshold': CLOUDFLARE_LARGE_CONTENT_THRESHOLD,
'max_retries': CLOUDFLARE_MAX_RETRIES,
'base_delay': CLOUDFLARE_BASE_DELAY
}
logger.info(f"🔄 LAZY INIT: Cloudflare config prepared for hybrid storage")
else:
logger.warning("🔄 LAZY INIT: Incomplete Cloudflare config, hybrid will run in SQLite-only mode")
logger.info(f"🔄 LAZY INIT: Creating HybridMemoryStorage instance...")
self.storage = HybridMemoryStorage(
sqlite_db_path=SQLITE_VEC_PATH,
embedding_model="all-MiniLM-L6-v2",
cloudflare_config=cloudflare_config,
sync_interval=HYBRID_SYNC_INTERVAL or 300,
batch_size=HYBRID_BATCH_SIZE or 50
)
logger.info(f"✅ LAZY INIT: Created Hybrid storage at: {SQLITE_VEC_PATH} with Cloudflare sync")
else:
# Unknown/unsupported backend
logger.error("=" * 70)
logger.error(f"❌ LAZY INIT: Unsupported storage backend: {STORAGE_BACKEND}")
logger.error("")
logger.error("Supported backends:")
logger.error(" - sqlite_vec (recommended for single-device use)")
logger.error(" - cloudflare (cloud storage)")
logger.error(" - hybrid (recommended for multi-device use)")
logger.error("=" * 70)
raise ValueError(
f"Unsupported storage backend: {STORAGE_BACKEND}. "
"Use 'sqlite_vec', 'cloudflare', or 'hybrid'."
)
# Initialize the storage backend
logger.info(f"🔧 LAZY INIT: Calling storage.initialize()...")
await self.storage.initialize()
logger.info(f"✅ LAZY INIT: storage.initialize() completed successfully")
# Verify the storage is properly initialized
if hasattr(self.storage, 'is_initialized') and not self.storage.is_initialized():
# Get detailed status for debugging
if hasattr(self.storage, 'get_initialization_status'):
status = self.storage.get_initialization_status()
logger.error(f"❌ LAZY INIT: Storage initialization incomplete: {status}")
raise RuntimeError("Storage initialization incomplete")
self._storage_initialized = True
storage_type = self.storage.__class__.__name__
logger.info(f"🎉 LAZY INIT: Storage backend ({STORAGE_BACKEND}) initialization successful")
logger.info(f"🔍 LAZY INIT: Final storage type verification: {storage_type}")
# Initialize consolidation system after storage is ready
await self._initialize_consolidation()
except Exception as e:
logger.error(f"❌ LAZY INIT: Failed to initialize {STORAGE_BACKEND} storage: {str(e)}")
logger.error(f"📋 LAZY INIT: Full traceback:")
logger.error(traceback.format_exc())
# Set storage to None to indicate failure
self.storage = None
self._storage_initialized = False
raise
return self.storage
async def initialize(self):
"""Async initialization method with eager storage initialization and timeout."""
try:
# Run any async initialization tasks here
logger.info("🚀 SERVER INIT: Starting async initialization...")
# Print system diagnostics only for LM Studio (avoid JSON parsing errors in Claude Desktop)
if MCP_CLIENT == 'lm_studio':
print("\n=== System Diagnostics ===", file=sys.stdout, flush=True)
print(f"OS: {self.system_info.os_name} {self.system_info.os_version}", file=sys.stdout, flush=True)
print(f"Architecture: {self.system_info.architecture}", file=sys.stdout, flush=True)
print(f"Memory: {self.system_info.memory_gb:.2f} GB", file=sys.stdout, flush=True)
print(f"Accelerator: {self.system_info.accelerator}", file=sys.stdout, flush=True)
print(f"Python: {platform.python_version()}", file=sys.stdout, flush=True)
# Log environment info
logger.info(f"🔧 SERVER INIT: Environment - STORAGE_BACKEND={STORAGE_BACKEND}")
# Attempt eager storage initialization with timeout
# Get dynamic timeout based on system and dependency status
timeout_seconds = get_recommended_timeout()
logger.info(f"⏱️ SERVER INIT: Attempting eager storage initialization (timeout: {timeout_seconds}s)...")
if MCP_CLIENT == 'lm_studio':
print(f"Attempting eager storage initialization (timeout: {timeout_seconds}s)...", file=sys.stdout, flush=True)
try:
init_task = asyncio.create_task(self._initialize_storage_with_timeout())
success = await asyncio.wait_for(init_task, timeout=timeout_seconds)
if success:
if MCP_CLIENT == 'lm_studio':
print("[OK] Eager storage initialization successful", file=sys.stdout, flush=True)
logger.info("✅ SERVER INIT: Eager storage initialization completed successfully")
# Verify storage type after successful eager init
if hasattr(self, 'storage') and self.storage:
storage_type = self.storage.__class__.__name__
logger.info(f"🔍 SERVER INIT: Eager init resulted in storage type: {storage_type}")
else:
if MCP_CLIENT == 'lm_studio':
print("[WARNING] Eager storage initialization failed, will use lazy loading", file=sys.stdout, flush=True)
logger.warning("⚠️ SERVER INIT: Eager initialization failed, falling back to lazy loading")
# Reset state for lazy loading
self.storage = None
self._storage_initialized = False
except asyncio.TimeoutError:
if MCP_CLIENT == 'lm_studio':
print("[TIMEOUT] Eager storage initialization timed out, will use lazy loading", file=sys.stdout, flush=True)
logger.warning(f"⏱️ SERVER INIT: Storage initialization timed out after {timeout_seconds}s, falling back to lazy loading")
# Reset state for lazy loading
self.storage = None
self._storage_initialized = False
except Exception as e:
if MCP_CLIENT == 'lm_studio':
print(f"[WARNING] Eager initialization error: {str(e)}, will use lazy loading", file=sys.stdout, flush=True)
logger.warning(f"⚠️ SERVER INIT: Eager initialization error: {str(e)}, falling back to lazy loading")
logger.warning(f"📋 SERVER INIT: Eager init error traceback:")
logger.warning(traceback.format_exc())
# Reset state for lazy loading
self.storage = None
self._storage_initialized = False
# Add explicit console output for Smithery to see (only for LM Studio)
if MCP_CLIENT == 'lm_studio':
print("MCP Memory Service initialization completed", file=sys.stdout, flush=True)
logger.info("🎉 SERVER INIT: Async initialization completed")
return True
except Exception as e:
logger.error(f"❌ SERVER INIT: Async initialization error: {str(e)}")
logger.error(f"📋 SERVER INIT: Full traceback:")
logger.error(traceback.format_exc())
# Add explicit console error output for Smithery to see
print(f"Initialization error: {str(e)}", file=sys.stderr, flush=True)
# Don't raise the exception, just return False
return False
async def validate_database_health(self):
"""Validate database health during initialization."""
from .utils.db_utils import validate_database, repair_database
try:
# Check database health
is_valid, message = await validate_database(self.storage)
if not is_valid:
logger.warning(f"Database validation failed: {message}")
# Attempt repair
logger.info("Attempting database repair...")
repair_success, repair_message = await repair_database(self.storage)
if not repair_success:
logger.error(f"Database repair failed: {repair_message}")
return False
else:
logger.info(f"Database repair successful: {repair_message}")
return True
else:
logger.info(f"Database validation successful: {message}")
return True
except Exception as e:
logger.error(f"Database validation error: {str(e)}")
return False
async def _initialize_consolidation(self):
"""Initialize the consolidation system after storage is ready."""
if not CONSOLIDATION_ENABLED or not self._storage_initialized:
return
try:
if self.consolidator is None:
# Create consolidation config
config = ConsolidationConfig(**CONSOLIDATION_CONFIG)
# Initialize the consolidator with storage
self.consolidator = DreamInspiredConsolidator(self.storage, config)
logger.info("Dream-inspired consolidator initialized")
# Initialize the scheduler if not disabled
if any(schedule != 'disabled' for schedule in CONSOLIDATION_SCHEDULE.values()):
self.consolidation_scheduler = ConsolidationScheduler(
self.consolidator,
CONSOLIDATION_SCHEDULE,
enabled=True
)
# Start the scheduler
if await self.consolidation_scheduler.start():
logger.info("Consolidation scheduler started successfully")
else:
logger.warning("Failed to start consolidation scheduler")
self.consolidation_scheduler = None
else:
logger.info("Consolidation scheduler disabled (all schedules set to 'disabled')")
except Exception as e:
logger.error(f"Failed to initialize consolidation system: {e}")
logger.error(traceback.format_exc())
self.consolidator = None
self.consolidation_scheduler = None
def handle_method_not_found(self, method: str) -> None:
"""Custom handler for unsupported methods.
This logs the unsupported method request but doesn't raise an exception,
allowing the MCP server to handle it with a standard JSON-RPC error response.
"""
logger.warning(f"Unsupported method requested: {method}")
# The MCP server will automatically respond with a Method not found error
# We don't need to do anything else here
def register_handlers(self):
# Enhanced Resources implementation
@self.server.list_resources()
async def handle_list_resources() -> List[Resource]:
"""List available memory resources."""
await self._ensure_storage_initialized()
resources = [
types.Resource(
uri="memory://stats",
name="Memory Statistics",
description="Current memory database statistics",
mimeType="application/json"
),
types.Resource(
uri="memory://tags",
name="Available Tags",
description="List of all tags used in memories",
mimeType="application/json"
),
types.Resource(
uri="memory://recent/10",
name="Recent Memories",
description="10 most recent memories",
mimeType="application/json"
)
]
# Add tag-specific resources for existing tags
try:
all_tags = await self.storage.get_all_tags()
for tag in all_tags[:5]: # Limit to first 5 tags for resources
resources.append(types.Resource(
uri=f"memory://tag/{tag}",
name=f"Memories tagged '{tag}'",
description=f"All memories with tag '{tag}'",
mimeType="application/json"
))
except AttributeError:
# get_all_tags method not available on this storage backend
pass
except Exception as e:
logger.warning(f"Failed to load tag resources: {e}")
pass
return resources
@self.server.read_resource()
async def handle_read_resource(uri: str) -> str:
"""Read a specific memory resource."""
await self._ensure_storage_initialized()
import json
from urllib.parse import unquote
try:
if uri == "memory://stats":
# Get memory statistics
stats = await self.storage.get_stats()
return json.dumps(stats, indent=2)
elif uri == "memory://tags":
# Get all available tags
tags = await self.storage.get_all_tags()
return json.dumps({"tags": tags, "count": len(tags)}, indent=2)
elif uri.startswith("memory://recent/"):
# Get recent memories
n = int(uri.split("/")[-1])
memories = await self.storage.get_recent_memories(n)
return json.dumps({
"memories": [m.to_dict() for m in memories],
"count": len(memories)
}, indent=2, default=str)
elif uri.startswith("memory://tag/"):
# Get memories by tag
tag = unquote(uri.split("/", 3)[-1])
memories = await self.storage.search_by_tag([tag])
return json.dumps({
"tag": tag,
"memories": [m.to_dict() for m in memories],
"count": len(memories)
}, indent=2, default=str)
elif uri.startswith("memory://search/"):
# Dynamic search
query = unquote(uri.split("/", 3)[-1])
results = await self.storage.search(query, n_results=10)
return json.dumps({
"query": query,
"results": [r.to_dict() for r in results],
"count": len(results)
}, indent=2, default=str)
else:
return json.dumps({"error": f"Resource not found: {uri}"}, indent=2)
except Exception as e:
logger.error(f"Error reading resource {uri}: {e}")
return json.dumps({"error": str(e)}, indent=2)
@self.server.list_resource_templates()
async def handle_list_resource_templates() -> List[types.ResourceTemplate]:
"""List resource templates for dynamic queries."""
return [
types.ResourceTemplate(
uriTemplate="memory://recent/{n}",
name="Recent Memories",
description="Get N most recent memories",
mimeType="application/json"
),
types.ResourceTemplate(
uriTemplate="memory://tag/{tag}",
name="Memories by Tag",
description="Get all memories with a specific tag",
mimeType="application/json"
),
types.ResourceTemplate(
uriTemplate="memory://search/{query}",
name="Search Memories",
description="Search memories by query",
mimeType="application/json"
)
]
@self.server.list_prompts()
async def handle_list_prompts() -> List[types.Prompt]:
"""List available guided prompts for memory operations."""
return [
types.Prompt(
name="memory_review",
description="Review and organize memories from a specific time period",
arguments=[
types.PromptArgument(
name="time_period",
description="Time period to review (e.g., 'last week', 'yesterday', '2 days ago')",
required=True
),
types.PromptArgument(
name="focus_area",
description="Optional area to focus on (e.g., 'work', 'personal', 'learning')",
required=False
)
]
),
types.Prompt(
name="memory_analysis",
description="Analyze patterns and themes in stored memories",
arguments=[
types.PromptArgument(
name="tags",
description="Tags to analyze (comma-separated)",
required=False
),
types.PromptArgument(
name="time_range",
description="Time range to analyze (e.g., 'last month', 'all time')",
required=False
)
]
),
types.Prompt(
name="knowledge_export",
description="Export memories in a specific format",
arguments=[
types.PromptArgument(
name="format",
description="Export format (json, markdown, text)",
required=True
),
types.PromptArgument(
name="filter",
description="Filter criteria (tags or search query)",
required=False
)
]
),
types.Prompt(
name="memory_cleanup",
description="Identify and remove duplicate or outdated memories",
arguments=[
types.PromptArgument(
name="older_than",
description="Remove memories older than (e.g., '6 months', '1 year')",
required=False
),
types.PromptArgument(
name="similarity_threshold",
description="Similarity threshold for duplicates (0.0-1.0)",
required=False
)
]
),
types.Prompt(
name="learning_session",
description="Store structured learning notes from a study session",
arguments=[
types.PromptArgument(
name="topic",
description="Learning topic or subject",
required=True
),
types.PromptArgument(
name="key_points",
description="Key points learned (comma-separated)",
required=True
),
types.PromptArgument(
name="questions",
description="Questions or areas for further study",
required=False
)
]
)
]
@self.server.get_prompt()
async def handle_get_prompt(name: str, arguments: dict) -> types.GetPromptResult:
"""Handle prompt execution with provided arguments."""
await self._ensure_storage_initialized()
messages = []
if name == "memory_review":
time_period = arguments.get("time_period", "last week")
focus_area = arguments.get("focus_area", "")
# Retrieve memories from the specified time period
memories = await self.storage.recall_memory(time_period, n_results=20)
prompt_text = f"Review of memories from {time_period}"
if focus_area:
prompt_text += f" (focusing on {focus_area})"
prompt_text += ":\n\n"
if memories:
for mem in memories:
prompt_text += f"- {mem.content}\n"
if mem.metadata.tags:
prompt_text += f" Tags: {', '.join(mem.metadata.tags)}\n"
else:
prompt_text += "No memories found for this time period."
messages = [
types.PromptMessage(
role="user",
content=types.TextContent(
type="text",
text=prompt_text
)
)
]
elif name == "memory_analysis":
tags = arguments.get("tags", "").split(",") if arguments.get("tags") else []
time_range = arguments.get("time_range", "all time")
analysis_text = f"Memory Analysis"
if tags:
analysis_text += f" for tags: {', '.join(tags)}"
if time_range != "all time":
analysis_text += f" from {time_range}"
analysis_text += "\n\n"
# Get relevant memories
if tags:
memories = await self.storage.search_by_tag(tags)
else:
memories = await self.storage.get_recent_memories(100)
# Analyze patterns
tag_counts = {}
type_counts = {}
for mem in memories:
for tag in mem.metadata.tags:
tag_counts[tag] = tag_counts.get(tag, 0) + 1
mem_type = mem.metadata.memory_type
type_counts[mem_type] = type_counts.get(mem_type, 0) + 1
analysis_text += f"Total memories analyzed: {len(memories)}\n\n"
analysis_text += "Top tags:\n"
for tag, count in sorted(tag_counts.items(), key=lambda x: x[1], reverse=True)[:10]:
analysis_text += f" - {tag}: {count} occurrences\n"
analysis_text += "\nMemory types:\n"
for mem_type, count in type_counts.items():
analysis_text += f" - {mem_type}: {count} memories\n"
messages = [
types.PromptMessage(
role="user",
content=types.TextContent(
type="text",
text=analysis_text
)
)
]
elif name == "knowledge_export":
format_type = arguments.get("format", "json")
filter_criteria = arguments.get("filter", "")
# Get memories based on filter
if filter_criteria:
if "," in filter_criteria:
# Assume tags
memories = await self.storage.search_by_tag(filter_criteria.split(","))
else:
# Assume search query
memories = await self.storage.search(filter_criteria, n_results=100)
else:
memories = await self.storage.get_recent_memories(100)
export_text = f"Exported {len(memories)} memories in {format_type} format:\n\n"
if format_type == "markdown":
for mem in memories:
export_text += f"## {mem.metadata.created_at_iso}\n"
export_text += f"{mem.content}\n"
if mem.metadata.tags:
export_text += f"*Tags: {', '.join(mem.metadata.tags)}*\n"
export_text += "\n"
elif format_type == "text":
for mem in memories:
export_text += f"[{mem.metadata.created_at_iso}] {mem.content}\n"
else: # json
import json
export_data = [m.to_dict() for m in memories]
export_text += json.dumps(export_data, indent=2, default=str)
messages = [
types.PromptMessage(
role="user",
content=types.TextContent(
type="text",
text=export_text
)
)
]
elif name == "memory_cleanup":
older_than = arguments.get("older_than", "")
similarity_threshold = float(arguments.get("similarity_threshold", "0.95"))
cleanup_text = "Memory Cleanup Report:\n\n"
# Find duplicates
all_memories = await self.storage.get_recent_memories(1000)
duplicates = []
for i, mem1 in enumerate(all_memories):
for mem2 in all_memories[i+1:]:
# Simple similarity check based on content length
if abs(len(mem1.content) - len(mem2.content)) < 10:
if mem1.content[:50] == mem2.content[:50]:
duplicates.append((mem1, mem2))
cleanup_text += f"Found {len(duplicates)} potential duplicate pairs\n"
if older_than:
cleanup_text += f"\nMemories older than {older_than} can be archived\n"
messages = [
types.PromptMessage(
role="user",
content=types.TextContent(
type="text",
text=cleanup_text
)
)
]
elif name == "learning_session":
topic = arguments.get("topic", "General")
key_points = arguments.get("key_points", "").split(",")
questions = arguments.get("questions", "").split(",") if arguments.get("questions") else []
# Create structured learning note
learning_note = f"# Learning Session: {topic}\n\n"
learning_note += f"Date: {datetime.now().isoformat()}\n\n"
learning_note += "## Key Points:\n"
for point in key_points:
learning_note += f"- {point.strip()}\n"
if questions:
learning_note += "\n## Questions for Further Study:\n"
for question in questions:
learning_note += f"- {question.strip()}\n"
# Store the learning note
memory = Memory(
content=learning_note,
tags=["learning", topic.lower().replace(" ", "_")],
memory_type="learning_note"
)
success, message = await self.storage.store(memory)
response_text = f"Learning session stored successfully!\n\n{learning_note}"
if not success:
response_text = f"Failed to store learning session: {message}"
messages = [
types.PromptMessage(
role="user",
content=types.TextContent(
type="text",
text=response_text
)
)
]
else:
messages = [
types.PromptMessage(
role="user",
content=types.TextContent(
type="text",
text=f"Unknown prompt: {name}"
)
)
]
return types.GetPromptResult(
description=f"Result of {name} prompt",
messages=messages
)
# Add a custom error handler for unsupported methods
self.server.on_method_not_found = self.handle_method_not_found
@self.server.list_tools()
async def handle_list_tools() -> List[types.Tool]:
logger.info("=== HANDLING LIST_TOOLS REQUEST ===")
try:
tools = [
types.Tool(
name="store_memory",
description="""Store new information with optional tags.
Accepts two tag formats in metadata:
- Array: ["tag1", "tag2"]
- String: "tag1,tag2"
Examples:
# Using array format:
{
"content": "Memory content",
"metadata": {
"tags": ["important", "reference"],
"type": "note"
}
}
# Using string format(preferred):
{
"content": "Memory content",
"metadata": {
"tags": "important,reference",
"type": "note"
}
}""",
inputSchema={
"type": "object",
"properties": {
"content": {
"type": "string",
"description": "The memory content to store, such as a fact, note, or piece of information."
},
"metadata": {
"type": "object",
"description": "Optional metadata about the memory, including tags and type.",
"properties": {
"tags": {
"type": "array",
"items": {"type": "string"},
"description": "Tags to categorize the memory as an array of strings. If you have a comma-separated string, convert it to an array before calling."
},
"type": {
"type": "string",
"description": "Optional type or category label for the memory, e.g., 'note', 'fact', 'reminder'."
}
}
}
},
"required": ["content"]
}
),
types.Tool(
name="recall_memory",
description="""Retrieve memories using natural language time expressions and optional semantic search.
Supports various time-related expressions such as:
- "yesterday", "last week", "2 days ago"
- "last summer", "this month", "last January"
- "spring", "winter", "Christmas", "Thanksgiving"
- "morning", "evening", "yesterday afternoon"
Examples:
{
"query": "recall what I stored last week"
}
{
"query": "find information about databases from two months ago",
"n_results": 5
}
""",
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Natural language query specifying the time frame or content to recall, e.g., 'last week', 'yesterday afternoon', or a topic."
},
"n_results": {
"type": "number",
"default": 5,
"description": "Maximum number of results to return."
}
},
"required": ["query"]
}
),
types.Tool(
name="retrieve_memory",
description="""Find relevant memories based on query.
Example:
{
"query": "find this memory",
"n_results": 5
}""",
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search query to find relevant memories based on content."
},
"n_results": {
"type": "number",
"default": 5,
"description": "Maximum number of results to return."
}
},
"required": ["query"]
}
),
types.Tool(
name="search_by_tag",
description="""Search memories by tags. Must use array format.
Returns memories matching ANY of the specified tags.
Example:
{
"tags": ["important", "reference"]
}""",
inputSchema={
"type": "object",
"properties": {
"tags": {
"type": "array",
"items": {"type": "string"},
"description": "List of tags to search for. Returns memories matching ANY of these tags."
}
},
"required": ["tags"]
}
),
types.Tool(
name="delete_memory",
description="""Delete a specific memory by its hash.
Example:
{
"content_hash": "a1b2c3d4..."
}""",
inputSchema={
"type": "object",
"properties": {
"content_hash": {
"type": "string",
"description": "Hash of the memory content to delete. Obtainable from memory metadata."
}
},
"required": ["content_hash"]
}
),
types.Tool(
name="delete_by_tag",
description="""Delete all memories with specific tags.
WARNING: Deletes ALL memories containing any of the specified tags.
Example:
{"tags": ["temporary", "outdated"]}""",
inputSchema={
"type": "object",
"properties": {
"tags": {
"type": "array",
"items": {"type": "string"},
"description": "Array of tag labels. Memories containing any of these tags will be deleted."
}
},
"required": ["tags"]
}
),
types.Tool(
name="delete_by_tags",
description="""Delete all memories containing any of the specified tags.
This is the explicit multi-tag version for API clarity.
WARNING: Deletes ALL memories containing any of the specified tags.
Example:
{
"tags": ["temporary", "outdated", "test"]
}""",
inputSchema={
"type": "object",
"properties": {
"tags": {
"type": "array",
"items": {"type": "string"},
"description": "List of tag labels. Memories containing any of these tags will be deleted."
}
},
"required": ["tags"]
}
),
types.Tool(
name="delete_by_all_tags",
description="""Delete memories that contain ALL of the specified tags.
WARNING: Only deletes memories that have every one of the specified tags.
Example:
{
"tags": ["important", "urgent"]
}""",
inputSchema={
"type": "object",
"properties": {
"tags": {
"type": "array",
"items": {"type": "string"},
"description": "List of tag labels. Only memories containing ALL of these tags will be deleted."
}
},
"required": ["tags"]
}
),
types.Tool(
name="cleanup_duplicates",
description="Find and remove duplicate entries",
inputSchema={
"type": "object",
"properties": {}
}
),
types.Tool(
name="debug_retrieve",
description="""Retrieve memories with debug information.
Example:
{
"query": "debug this",
"n_results": 5,
"similarity_threshold": 0.0
}""",
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search query for debugging retrieval, e.g., a phrase or keyword."
},
"n_results": {
"type": "number",
"default": 5,
"description": "Maximum number of results to return."
},
"similarity_threshold": {
"type": "number",
"default": 0.0,
"description": "Minimum similarity score threshold for results (0.0 to 1.0)."
}
},
"required": ["query"]
}
),
types.Tool(
name="exact_match_retrieve",
description="""Retrieve memories using exact content match.
Example:
{
"content": "find exactly this"
}""",
inputSchema={
"type": "object",
"properties": {
"content": {
"type": "string",
"description": "Exact content string to match against stored memories."
}
},
"required": ["content"]
}
),
types.Tool(
name="check_database_health",
description="Check database health and get statistics",
inputSchema={
"type": "object",
"properties": {}
}
),
types.Tool(
name="recall_by_timeframe",
description="""Retrieve memories within a specific timeframe.
Example:
{
"start_date": "2024-01-01",
"end_date": "2024-01-31",
"n_results": 5
}""",
inputSchema={
"type": "object",
"properties": {
"start_date": {
"type": "string",
"format": "date",
"description": "Start date (inclusive) in YYYY-MM-DD format."
},
"end_date": {
"type": "string",
"format": "date",
"description": "End date (inclusive) in YYYY-MM-DD format."
},
"n_results": {
"type": "number",
"default": 5,
"description": "Maximum number of results to return."
}
},
"required": ["start_date"]
}
),
types.Tool(
name="delete_by_timeframe",
description="""Delete memories within a specific timeframe.
Optional tag parameter to filter deletions.
Example:
{
"start_date": "2024-01-01",
"end_date": "2024-01-31",
"tag": "temporary"
}""",
inputSchema={
"type": "object",
"properties": {
"start_date": {
"type": "string",
"format": "date",
"description": "Start date (inclusive) in YYYY-MM-DD format."
},
"end_date": {
"type": "string",
"format": "date",
"description": "End date (inclusive) in YYYY-MM-DD format."
},
"tag": {
"type": "string",
"description": "Optional tag to filter deletions. Only memories with this tag will be deleted."
}
},
"required": ["start_date"]
}
),
types.Tool(
name="delete_before_date",
description="""Delete memories before a specific date.
Optional tag parameter to filter deletions.
Example:
{
"before_date": "2024-01-01",
"tag": "temporary"
}""",
inputSchema={
"type": "object",
"properties": {
"before_date": {"type": "string", "format": "date"},
"tag": {"type": "string"}
},
"required": ["before_date"]
}
),
types.Tool(
name="update_memory_metadata",
description="""Update memory metadata without recreating the entire memory entry.
This provides efficient metadata updates while preserving the original
memory content, embeddings, and optionally timestamps.
Examples:
# Add tags to a memory
{
"content_hash": "abc123...",
"updates": {
"tags": ["important", "reference", "new-tag"]
}
}
# Update memory type and custom metadata
{
"content_hash": "abc123...",
"updates": {
"memory_type": "reminder",
"metadata": {
"priority": "high",
"due_date": "2024-01-15"
}
}
}
# Update custom fields directly
{
"content_hash": "abc123...",
"updates": {
"priority": "urgent",
"status": "active"
}
}""",
inputSchema={
"type": "object",
"properties": {
"content_hash": {
"type": "string",
"description": "The content hash of the memory to update."
},
"updates": {
"type": "object",
"description": "Dictionary of metadata fields to update.",
"properties": {
"tags": {
"type": "array",
"items": {"type": "string"},
"description": "Replace existing tags with this list."
},
"memory_type": {
"type": "string",
"description": "Update the memory type (e.g., 'note', 'reminder', 'fact')."
},
"metadata": {
"type": "object",
"description": "Custom metadata fields to merge with existing metadata."
}
}
},
"preserve_timestamps": {
"type": "boolean",
"default": True,
"description": "Whether to preserve the original created_at timestamp (default: true)."
}
},
"required": ["content_hash", "updates"]
}
)
]
# Add consolidation tools if enabled
if CONSOLIDATION_ENABLED and self.consolidator:
consolidation_tools = [
types.Tool(
name="consolidate_memories",
description="""Run memory consolidation for a specific time horizon.
Performs dream-inspired memory consolidation including:
- Exponential decay scoring
- Creative association discovery
- Semantic clustering and compression
- Controlled forgetting with archival
Example:
{
"time_horizon": "weekly"
}""",
inputSchema={
"type": "object",
"properties": {
"time_horizon": {
"type": "string",
"enum": ["daily", "weekly", "monthly", "quarterly", "yearly"],
"description": "Time horizon for consolidation operations."
}
},
"required": ["time_horizon"]
}
),
types.Tool(
name="consolidation_status",
description="Get status and health information about the consolidation system.",
inputSchema={"type": "object", "properties": {}}
),
types.Tool(
name="consolidation_recommendations",
description="""Get recommendations for consolidation based on current memory state.
Example:
{
"time_horizon": "monthly"
}""",
inputSchema={
"type": "object",
"properties": {
"time_horizon": {
"type": "string",
"enum": ["daily", "weekly", "monthly", "quarterly", "yearly"],
"description": "Time horizon to analyze for consolidation recommendations."
}
},
"required": ["time_horizon"]
}
),
types.Tool(
name="scheduler_status",
description="Get consolidation scheduler status and job information.",
inputSchema={"type": "object", "properties": {}}
),
types.Tool(
name="trigger_consolidation",
description="""Manually trigger a consolidation job.
Example:
{
"time_horizon": "weekly",
"immediate": true
}""",
inputSchema={
"type": "object",
"properties": {
"time_horizon": {
"type": "string",
"enum": ["daily", "weekly", "monthly", "quarterly", "yearly"],
"description": "Time horizon for the consolidation job."
},
"immediate": {
"type": "boolean",
"default": True,
"description": "Whether to run immediately or schedule for later."
}
},
"required": ["time_horizon"]
}
),
types.Tool(
name="pause_consolidation",
description="""Pause consolidation jobs.
Example:
{
"time_horizon": "weekly"
}""",
inputSchema={
"type": "object",
"properties": {
"time_horizon": {
"type": "string",
"enum": ["daily", "weekly", "monthly", "quarterly", "yearly"],
"description": "Specific time horizon to pause, or omit to pause all jobs."
}
}
}
),
types.Tool(
name="resume_consolidation",
description="""Resume consolidation jobs.
Example:
{
"time_horizon": "weekly"
}""",
inputSchema={
"type": "object",
"properties": {
"time_horizon": {
"type": "string",
"enum": ["daily", "weekly", "monthly", "quarterly", "yearly"],
"description": "Specific time horizon to resume, or omit to resume all jobs."
}
}
}
)
]
tools.extend(consolidation_tools)
logger.info(f"Added {len(consolidation_tools)} consolidation tools")
# Add document ingestion tools
ingestion_tools = [
types.Tool(
name="ingest_document",
description="""Ingest a single document file into the memory database.
Supports multiple formats:
- PDF files (.pdf)
- Text files (.txt, .md, .markdown, .rst)
- JSON files (.json)
The document will be parsed, chunked intelligently, and stored
as multiple memories with appropriate metadata.
Example:
{
"file_path": "/path/to/document.pdf",
"tags": ["documentation", "manual"],
"chunk_size": 1000
}""",
inputSchema={
"type": "object",
"properties": {
"file_path": {
"type": "string",
"description": "Path to the document file to ingest."
},
"tags": {
"type": "array",
"items": {"type": "string"},
"description": "Optional tags to apply to all memories created from this document.",
"default": []
},
"chunk_size": {
"type": "number",
"description": "Target size for text chunks in characters (default: 1000).",
"default": 1000
},
"chunk_overlap": {
"type": "number",
"description": "Characters to overlap between chunks (default: 200).",
"default": 200
},
"memory_type": {
"type": "string",
"description": "Type label for created memories (default: 'document').",
"default": "document"
}
},
"required": ["file_path"]
}
),
types.Tool(
name="ingest_directory",
description="""Batch ingest all supported documents from a directory.
Recursively processes all supported file types in the directory,
creating memories with consistent tagging and metadata.
Supported formats: PDF, TXT, MD, JSON
Example:
{
"directory_path": "/path/to/documents",
"tags": ["knowledge-base"],
"recursive": true,
"file_extensions": ["pdf", "md", "txt"]
}""",
inputSchema={
"type": "object",
"properties": {
"directory_path": {
"type": "string",
"description": "Path to the directory containing documents to ingest."
},
"tags": {
"type": "array",
"items": {"type": "string"},
"description": "Optional tags to apply to all memories created.",
"default": []
},
"recursive": {
"type": "boolean",
"description": "Whether to process subdirectories recursively (default: true).",
"default": True
},
"file_extensions": {
"type": "array",
"items": {"type": "string"},
"description": "File extensions to process (default: all supported).",
"default": ["pdf", "txt", "md", "json"]
},
"chunk_size": {
"type": "number",
"description": "Target size for text chunks in characters (default: 1000).",
"default": 1000
},
"max_files": {
"type": "number",
"description": "Maximum number of files to process (default: 100).",
"default": 100
}
},
"required": ["directory_path"]
}
)
]
tools.extend(ingestion_tools)
logger.info(f"Added {len(ingestion_tools)} ingestion tools")
logger.info(f"Returning {len(tools)} tools")
return tools
except Exception as e:
logger.error(f"Error in handle_list_tools: {str(e)}")
logger.error(traceback.format_exc())
raise
@self.server.call_tool()
async def handle_call_tool(name: str, arguments: dict | None) -> List[types.TextContent]:
# Add immediate debugging to catch any protocol issues
if MCP_CLIENT == 'lm_studio':
print(f"TOOL CALL INTERCEPTED: {name}", file=sys.stdout, flush=True)
logger.info(f"=== HANDLING TOOL CALL: {name} ===")
logger.info(f"Arguments: {arguments}")
try:
if arguments is None:
arguments = {}
logger.info(f"Processing tool: {name}")
if MCP_CLIENT == 'lm_studio':
print(f"Processing tool: {name}", file=sys.stdout, flush=True)
if name == "store_memory":
return await self.handle_store_memory(arguments)
elif name == "retrieve_memory":
return await self.handle_retrieve_memory(arguments)
elif name == "recall_memory":
return await self.handle_recall_memory(arguments)
elif name == "search_by_tag":
return await self.handle_search_by_tag(arguments)
elif name == "delete_memory":
return await self.handle_delete_memory(arguments)
elif name == "delete_by_tag":
return await self.handle_delete_by_tag(arguments)
elif name == "delete_by_tags":
return await self.handle_delete_by_tags(arguments)
elif name == "delete_by_all_tags":
return await self.handle_delete_by_all_tags(arguments)
elif name == "cleanup_duplicates":
return await self.handle_cleanup_duplicates(arguments)
elif name == "debug_retrieve":
return await self.handle_debug_retrieve(arguments)
elif name == "exact_match_retrieve":
return await self.handle_exact_match_retrieve(arguments)
elif name == "check_database_health":
logger.info("Calling handle_check_database_health")
return await self.handle_check_database_health(arguments)
elif name == "recall_by_timeframe":
return await self.handle_recall_by_timeframe(arguments)
elif name == "delete_by_timeframe":
return await self.handle_delete_by_timeframe(arguments)
elif name == "delete_before_date":
return await self.handle_delete_before_date(arguments)
elif name == "update_memory_metadata":
logger.info("Calling handle_update_memory_metadata")
return await self.handle_update_memory_metadata(arguments)
# Consolidation tool handlers
elif name == "consolidate_memories":
logger.info("Calling handle_consolidate_memories")
return await self.handle_consolidate_memories(arguments)
elif name == "consolidation_status":
logger.info("Calling handle_consolidation_status")
return await self.handle_consolidation_status(arguments)
elif name == "consolidation_recommendations":
logger.info("Calling handle_consolidation_recommendations")
return await self.handle_consolidation_recommendations(arguments)
elif name == "scheduler_status":
logger.info("Calling handle_scheduler_status")
return await self.handle_scheduler_status(arguments)
elif name == "trigger_consolidation":
logger.info("Calling handle_trigger_consolidation")
return await self.handle_trigger_consolidation(arguments)
elif name == "pause_consolidation":
logger.info("Calling handle_pause_consolidation")
return await self.handle_pause_consolidation(arguments)
elif name == "resume_consolidation":
logger.info("Calling handle_resume_consolidation")
return await self.handle_resume_consolidation(arguments)
elif name == "ingest_document":
logger.info("Calling handle_ingest_document")
return await self.handle_ingest_document(arguments)
elif name == "ingest_directory":
logger.info("Calling handle_ingest_directory")
return await self.handle_ingest_directory(arguments)
else:
logger.warning(f"Unknown tool requested: {name}")
raise ValueError(f"Unknown tool: {name}")
except Exception as e:
error_msg = f"Error in {name}: {str(e)}\n{traceback.format_exc()}"
logger.error(error_msg)
print(f"ERROR in tool execution: {error_msg}", file=sys.stderr, flush=True)
return [types.TextContent(type="text", text=f"Error: {str(e)}")]
async def handle_store_memory(self, arguments: dict) -> List[types.TextContent]:
content = arguments.get("content")
metadata = arguments.get("metadata", {})
if not content:
return [types.TextContent(type="text", text="Error: Content is required")]
try:
# Initialize storage lazily when needed
storage = await self._ensure_storage_initialized()
# Normalize tags to a list
tags = metadata.get("tags", "")
if isinstance(tags, str):
tags = [tag.strip() for tag in tags.split(",") if tag.strip()]
elif isinstance(tags, list):
tags = [str(tag).strip() for tag in tags if str(tag).strip()]
else:
tags = [] # If tags is neither string nor list, default to empty list
sanitized_tags = storage.sanitized(tags)
# Add optional hostname tracking
final_metadata = metadata.copy()
if INCLUDE_HOSTNAME:
# Prioritize client-provided hostname, then fallback to server
client_hostname = arguments.get("client_hostname")
if client_hostname:
hostname = client_hostname
else:
hostname = socket.gethostname()
source_tag = f"source:{hostname}"
if source_tag not in tags:
tags.append(source_tag)
final_metadata["hostname"] = hostname
# Create memory object
content_hash = generate_content_hash(content, final_metadata)
now = time.time()
memory = Memory(
content=content,
content_hash=content_hash,
tags=tags, # keep as a list for easier use in other methods
memory_type=final_metadata.get("type"),
metadata = {**final_metadata, "tags":sanitized_tags}, # include the stringified tags in the meta data
created_at=now,
created_at_iso=datetime.utcfromtimestamp(now).isoformat() + "Z"
)
# Store memory
success, message = await storage.store(memory)
return [types.TextContent(type="text", text=message)]
except Exception as e:
logger.error(f"Error storing memory: {str(e)}\n{traceback.format_exc()}")
return [types.TextContent(type="text", text=f"Error storing memory: {str(e)}")]
async def handle_retrieve_memory(self, arguments: dict) -> List[types.TextContent]:
query = arguments.get("query")
n_results = arguments.get("n_results", 5)
if not query:
return [types.TextContent(type="text", text="Error: Query is required")]
try:
# Initialize storage lazily when needed
storage = await self._ensure_storage_initialized()
# Track performance
start_time = time.time()
results = await storage.retrieve(query, n_results)
query_time_ms = (time.time() - start_time) * 1000
# Record query time for performance monitoring
self.record_query_time(query_time_ms)
if not results:
return [types.TextContent(type="text", text="No matching memories found")]
formatted_results = []
for i, result in enumerate(results):
memory_info = [f"Memory {i+1}:"]
timestamp = getattr(result.memory, "timestamp", None)
if timestamp:
memory_info.append(f"Timestamp: {timestamp.strftime('%Y-%m-%d %H:%M:%S')}")
memory_info.extend([
f"Content: {result.memory.content}",
f"Hash: {result.memory.content_hash}",
f"Relevance Score: {result.relevance_score:.2f}"
])
if result.memory.tags:
memory_info.append(f"Tags: {', '.join(result.memory.tags)}")
memory_info.append("---")
formatted_results.append("\n".join(memory_info))
return [types.TextContent(
type="text",
text="Found the following memories:\n\n" + "\n".join(formatted_results)
)]
except Exception as e:
logger.error(f"Error retrieving memories: {str(e)}\n{traceback.format_exc()}")
return [types.TextContent(type="text", text=f"Error retrieving memories: {str(e)}")]
async def handle_search_by_tag(self, arguments: dict) -> List[types.TextContent]:
tags = arguments.get("tags", [])
if not tags:
return [types.TextContent(type="text", text="Error: Tags are required")]
try:
# Initialize storage lazily when needed
storage = await self._ensure_storage_initialized()
memories = await storage.search_by_tag(tags)
if not memories:
return [types.TextContent(
type="text",
text=f"No memories found with tags: {', '.join(tags)}"
)]
formatted_results = []
for i, memory in enumerate(memories):
memory_info = [f"Memory {i+1}:"]
timestamp = getattr(memory, "timestamp", None)
if timestamp:
memory_info.append(f"Timestamp: {timestamp.strftime('%Y-%m-%d %H:%M:%S')}")
memory_info.extend([
f"Content: {memory.content}",
f"Hash: {memory.content_hash}",
f"Tags: {', '.join(memory.tags)}"
])
if memory.memory_type:
memory_info.append(f"Type: {memory.memory_type}")
memory_info.append("---")
formatted_results.append("\n".join(memory_info))
return [types.TextContent(
type="text",
text="Found the following memories:\n\n" + "\n".join(formatted_results)
)]
except Exception as e:
logger.error(f"Error searching by tags: {str(e)}\n{traceback.format_exc()}")
return [types.TextContent(type="text", text=f"Error searching by tags: {str(e)}")]
async def handle_delete_memory(self, arguments: dict) -> List[types.TextContent]:
content_hash = arguments.get("content_hash")
try:
# Initialize storage lazily when needed
storage = await self._ensure_storage_initialized()
success, message = await storage.delete(content_hash)
return [types.TextContent(type="text", text=message)]
except Exception as e:
logger.error(f"Error deleting memory: {str(e)}\n{traceback.format_exc()}")
return [types.TextContent(type="text", text=f"Error deleting memory: {str(e)}")]
async def handle_delete_by_tag(self, arguments: dict) -> List[types.TextContent]:
"""Handler for deleting memories by tags."""
tags = arguments.get("tags", [])
if not tags:
return [types.TextContent(type="text", text="Error: Tags array is required")]
# Convert single string to array if needed for backward compatibility
if isinstance(tags, str):
tags = [tags]
try:
# Initialize storage lazily when needed
storage = await self._ensure_storage_initialized()
count, message = await storage.delete_by_tag(tags)
return [types.TextContent(type="text", text=message)]
except Exception as e:
logger.error(f"Error deleting by tag: {str(e)}\n{traceback.format_exc()}")
return [types.TextContent(type="text", text=f"Error deleting by tag: {str(e)}")]
async def handle_delete_by_tags(self, arguments: dict) -> List[types.TextContent]:
"""Handler for explicit multiple tag deletion with progress tracking."""
tags = arguments.get("tags", [])
if not tags:
return [types.TextContent(type="text", text="Error: Tags array is required")]
try:
# Initialize storage lazily when needed
storage = await self._ensure_storage_initialized()
# Generate operation ID for progress tracking
import uuid
operation_id = f"delete_by_tags_{uuid.uuid4().hex[:8]}"
# Send initial progress notification
await self.send_progress_notification(operation_id, 0, f"Starting deletion of memories with tags: {', '.join(tags)}")
# Execute deletion with progress updates
await self.send_progress_notification(operation_id, 25, "Searching for memories to delete...")
# If storage supports progress callbacks, use them
if hasattr(storage, 'delete_by_tags_with_progress'):
count, message = await storage.delete_by_tags_with_progress(
tags,
progress_callback=lambda p, msg: asyncio.create_task(
self.send_progress_notification(operation_id, 25 + (p * 0.7), msg)
)
)
else:
await self.send_progress_notification(operation_id, 50, "Deleting memories...")
count, message = await storage.delete_by_tags(tags)
await self.send_progress_notification(operation_id, 90, f"Deleted {count} memories")
# Complete the operation
await self.send_progress_notification(operation_id, 100, f"Deletion completed: {message}")
return [types.TextContent(type="text", text=f"{message} (Operation ID: {operation_id})")]
except Exception as e:
logger.error(f"Error deleting by tags: {str(e)}\n{traceback.format_exc()}")
return [types.TextContent(type="text", text=f"Error deleting by tags: {str(e)}")]
async def handle_delete_by_all_tags(self, arguments: dict) -> List[types.TextContent]:
"""Handler for deleting memories that contain ALL specified tags."""
tags = arguments.get("tags", [])
if not tags:
return [types.TextContent(type="text", text="Error: Tags array is required")]
try:
# Initialize storage lazily when needed
storage = await self._ensure_storage_initialized()
count, message = await storage.delete_by_all_tags(tags)
return [types.TextContent(type="text", text=message)]
except Exception as e:
logger.error(f"Error deleting by all tags: {str(e)}\n{traceback.format_exc()}")
return [types.TextContent(type="text", text=f"Error deleting by all tags: {str(e)}")]
async def handle_cleanup_duplicates(self, arguments: dict) -> List[types.TextContent]:
try:
# Initialize storage lazily when needed
storage = await self._ensure_storage_initialized()
count, message = await storage.cleanup_duplicates()
return [types.TextContent(type="text", text=message)]
except Exception as e:
logger.error(f"Error cleaning up duplicates: {str(e)}\n{traceback.format_exc()}")
return [types.TextContent(type="text", text=f"Error cleaning up duplicates: {str(e)}")]
async def handle_update_memory_metadata(self, arguments: dict) -> List[types.TextContent]:
"""Handle memory metadata update requests."""
try:
content_hash = arguments.get("content_hash")
updates = arguments.get("updates")
preserve_timestamps = arguments.get("preserve_timestamps", True)
if not content_hash:
return [types.TextContent(type="text", text="Error: content_hash is required")]
if not updates:
return [types.TextContent(type="text", text="Error: updates dictionary is required")]
if not isinstance(updates, dict):
return [types.TextContent(type="text", text="Error: updates must be a dictionary")]
# Initialize storage lazily when needed
storage = await self._ensure_storage_initialized()
# Call the storage method
success, message = await storage.update_memory_metadata(
content_hash=content_hash,
updates=updates,
preserve_timestamps=preserve_timestamps
)
if success:
logger.info(f"Successfully updated metadata for memory {content_hash}")
return [types.TextContent(
type="text",
text=f"Successfully updated memory metadata. {message}"
)]
else:
logger.warning(f"Failed to update metadata for memory {content_hash}: {message}")
return [types.TextContent(type="text", text=f"Failed to update memory metadata: {message}")]
except Exception as e:
error_msg = f"Error updating memory metadata: {str(e)}"
logger.error(f"{error_msg}\n{traceback.format_exc()}")
return [types.TextContent(type="text", text=error_msg)]
# Consolidation tool handlers
async def handle_consolidate_memories(self, arguments: dict) -> List[types.TextContent]:
"""Handle memory consolidation requests."""
if not CONSOLIDATION_ENABLED or not self.consolidator:
return [types.TextContent(type="text", text="Error: Consolidation system not available")]
try:
time_horizon = arguments.get("time_horizon")
if not time_horizon:
return [types.TextContent(type="text", text="Error: time_horizon is required")]
if time_horizon not in ["daily", "weekly", "monthly", "quarterly", "yearly"]:
return [types.TextContent(type="text", text="Error: Invalid time_horizon. Must be one of: daily, weekly, monthly, quarterly, yearly")]
logger.info(f"Starting {time_horizon} consolidation")
# Run consolidation
report = await self.consolidator.consolidate(time_horizon)
# Format response
result = f"""Consolidation completed successfully!
Time Horizon: {report.time_horizon}
Duration: {(report.end_time - report.start_time).total_seconds():.2f} seconds
Memories Processed: {report.memories_processed}
Associations Discovered: {report.associations_discovered}
Clusters Created: {report.clusters_created}
Memories Compressed: {report.memories_compressed}
Memories Archived: {report.memories_archived}"""
if report.errors:
result += f"\n\nWarnings/Errors:\n" + "\n".join(f"- {error}" for error in report.errors)
return [types.TextContent(type="text", text=result)]
except Exception as e:
error_msg = f"Error during consolidation: {str(e)}"
logger.error(f"{error_msg}\n{traceback.format_exc()}")
return [types.TextContent(type="text", text=error_msg)]
async def handle_consolidation_status(self, arguments: dict) -> List[types.TextContent]:
"""Handle consolidation status requests."""
if not CONSOLIDATION_ENABLED or not self.consolidator:
return [types.TextContent(type="text", text="Consolidation system: DISABLED")]
try:
# Get health check from consolidator
health = await self.consolidator.health_check()
# Format status report
status_lines = [
f"Consolidation System Status: {health['status'].upper()}",
f"Last Updated: {health['timestamp']}",
"",
"Component Health:"
]
for component, component_health in health['components'].items():
status = component_health['status']
status_lines.append(f" {component}: {status.upper()}")
if status == 'unhealthy' and 'error' in component_health:
status_lines.append(f" Error: {component_health['error']}")
status_lines.extend([
"",
"Statistics:",
f" Total consolidation runs: {health['statistics']['total_runs']}",
f" Successful runs: {health['statistics']['successful_runs']}",
f" Total memories processed: {health['statistics']['total_memories_processed']}",
f" Total associations created: {health['statistics']['total_associations_created']}",
f" Total clusters created: {health['statistics']['total_clusters_created']}",
f" Total memories compressed: {health['statistics']['total_memories_compressed']}",
f" Total memories archived: {health['statistics']['total_memories_archived']}"
])
if health['last_consolidation_times']:
status_lines.extend([
"",
"Last Consolidation Times:"
])
for horizon, timestamp in health['last_consolidation_times'].items():
status_lines.append(f" {horizon}: {timestamp}")
return [types.TextContent(type="text", text="\n".join(status_lines))]
except Exception as e:
error_msg = f"Error getting consolidation status: {str(e)}"
logger.error(f"{error_msg}\n{traceback.format_exc()}")
return [types.TextContent(type="text", text=error_msg)]
async def handle_consolidation_recommendations(self, arguments: dict) -> List[types.TextContent]:
"""Handle consolidation recommendation requests."""
if not CONSOLIDATION_ENABLED or not self.consolidator:
return [types.TextContent(type="text", text="Error: Consolidation system not available")]
try:
time_horizon = arguments.get("time_horizon")
if not time_horizon:
return [types.TextContent(type="text", text="Error: time_horizon is required")]
if time_horizon not in ["daily", "weekly", "monthly", "quarterly", "yearly"]:
return [types.TextContent(type="text", text="Error: Invalid time_horizon")]
# Get recommendations
recommendations = await self.consolidator.get_consolidation_recommendations(time_horizon)
# Format response
lines = [
f"Consolidation Recommendations for {time_horizon} horizon:",
"",
f"Recommendation: {recommendations['recommendation'].upper()}",
f"Memory Count: {recommendations['memory_count']}",
]
if 'reasons' in recommendations:
lines.extend([
"",
"Reasons:"
])
for reason in recommendations['reasons']:
lines.append(f" • {reason}")
if 'memory_types' in recommendations:
lines.extend([
"",
"Memory Types:"
])
for mem_type, count in recommendations['memory_types'].items():
lines.append(f" {mem_type}: {count}")
if 'total_size_bytes' in recommendations:
size_mb = recommendations['total_size_bytes'] / (1024 * 1024)
lines.append(f"\nTotal Size: {size_mb:.2f} MB")
if 'old_memory_percentage' in recommendations:
lines.append(f"Old Memory Percentage: {recommendations['old_memory_percentage']:.1f}%")
if 'estimated_duration_seconds' in recommendations:
lines.append(f"Estimated Duration: {recommendations['estimated_duration_seconds']:.1f} seconds")
return [types.TextContent(type="text", text="\n".join(lines))]
except Exception as e:
error_msg = f"Error getting consolidation recommendations: {str(e)}"
logger.error(f"{error_msg}\n{traceback.format_exc()}")
return [types.TextContent(type="text", text=error_msg)]
async def handle_scheduler_status(self, arguments: dict) -> List[types.TextContent]:
"""Handle scheduler status requests."""
if not CONSOLIDATION_ENABLED or not self.consolidation_scheduler:
return [types.TextContent(type="text", text="Consolidation scheduler: DISABLED")]
try:
# Get scheduler status
status = await self.consolidation_scheduler.get_scheduler_status()
if not status['enabled']:
return [types.TextContent(type="text", text=f"Scheduler: DISABLED\nReason: {status.get('reason', 'Unknown')}")]
# Format status report
lines = [
f"Consolidation Scheduler Status: {'RUNNING' if status['running'] else 'STOPPED'}",
"",
"Scheduled Jobs:"
]
for job in status['jobs']:
next_run = job['next_run_time'] or 'Not scheduled'
lines.append(f" {job['name']}: {next_run}")
lines.extend([
"",
"Execution Statistics:",
f" Total jobs executed: {status['execution_stats']['total_jobs']}",
f" Successful jobs: {status['execution_stats']['successful_jobs']}",
f" Failed jobs: {status['execution_stats']['failed_jobs']}"
])
if status['last_execution_times']:
lines.extend([
"",
"Last Execution Times:"
])
for horizon, timestamp in status['last_execution_times'].items():
lines.append(f" {horizon}: {timestamp}")
if status['recent_jobs']:
lines.extend([
"",
"Recent Jobs:"
])
for job in status['recent_jobs'][-5:]: # Show last 5 jobs
duration = (job['end_time'] - job['start_time']).total_seconds()
lines.append(f" {job['time_horizon']} ({job['status']}): {duration:.2f}s")
return [types.TextContent(type="text", text="\n".join(lines))]
except Exception as e:
error_msg = f"Error getting scheduler status: {str(e)}"
logger.error(f"{error_msg}\n{traceback.format_exc()}")
return [types.TextContent(type="text", text=error_msg)]
async def handle_trigger_consolidation(self, arguments: dict) -> List[types.TextContent]:
"""Handle manual consolidation trigger requests."""
if not CONSOLIDATION_ENABLED or not self.consolidation_scheduler:
return [types.TextContent(type="text", text="Error: Consolidation scheduler not available")]
try:
time_horizon = arguments.get("time_horizon")
immediate = arguments.get("immediate", True)
if not time_horizon:
return [types.TextContent(type="text", text="Error: time_horizon is required")]
if time_horizon not in ["daily", "weekly", "monthly", "quarterly", "yearly"]:
return [types.TextContent(type="text", text="Error: Invalid time_horizon")]
# Trigger consolidation
success = await self.consolidation_scheduler.trigger_consolidation(time_horizon, immediate)
if success:
action = "triggered immediately" if immediate else "scheduled for later"
return [types.TextContent(type="text", text=f"Successfully {action} {time_horizon} consolidation")]
else:
return [types.TextContent(type="text", text=f"Failed to trigger {time_horizon} consolidation")]
except Exception as e:
error_msg = f"Error triggering consolidation: {str(e)}"
logger.error(f"{error_msg}\n{traceback.format_exc()}")
return [types.TextContent(type="text", text=error_msg)]
async def handle_pause_consolidation(self, arguments: dict) -> List[types.TextContent]:
"""Handle consolidation pause requests."""
if not CONSOLIDATION_ENABLED or not self.consolidation_scheduler:
return [types.TextContent(type="text", text="Error: Consolidation scheduler not available")]
try:
time_horizon = arguments.get("time_horizon")
# Pause consolidation
success = await self.consolidation_scheduler.pause_consolidation(time_horizon)
if success:
target = time_horizon or "all"
return [types.TextContent(type="text", text=f"Successfully paused {target} consolidation jobs")]
else:
return [types.TextContent(type="text", text="Failed to pause consolidation jobs")]
except Exception as e:
error_msg = f"Error pausing consolidation: {str(e)}"
logger.error(f"{error_msg}\n{traceback.format_exc()}")
return [types.TextContent(type="text", text=error_msg)]
async def handle_resume_consolidation(self, arguments: dict) -> List[types.TextContent]:
"""Handle consolidation resume requests."""
if not CONSOLIDATION_ENABLED or not self.consolidation_scheduler:
return [types.TextContent(type="text", text="Error: Consolidation scheduler not available")]
try:
time_horizon = arguments.get("time_horizon")
# Resume consolidation
success = await self.consolidation_scheduler.resume_consolidation(time_horizon)
if success:
target = time_horizon or "all"
return [types.TextContent(type="text", text=f"Successfully resumed {target} consolidation jobs")]
else:
return [types.TextContent(type="text", text="Failed to resume consolidation jobs")]
except Exception as e:
error_msg = f"Error resuming consolidation: {str(e)}"
logger.error(f"{error_msg}\n{traceback.format_exc()}")
return [types.TextContent(type="text", text=error_msg)]
async def handle_debug_retrieve(self, arguments: dict) -> List[types.TextContent]:
query = arguments.get("query")
n_results = arguments.get("n_results", 5)
similarity_threshold = arguments.get("similarity_threshold", 0.0)
if not query:
return [types.TextContent(type="text", text="Error: Query is required")]
try:
# Initialize storage lazily when needed
storage = await self._ensure_storage_initialized()
from .utils.debug import debug_retrieve_memory
results = await debug_retrieve_memory(
storage,
query,
n_results,
similarity_threshold
)
if not results:
return [types.TextContent(type="text", text="No matching memories found")]
formatted_results = []
for i, result in enumerate(results):
memory_info = [
f"Memory {i+1}:",
f"Content: {result.memory.content}",
f"Hash: {result.memory.content_hash}",
f"Raw Similarity Score: {result.debug_info['raw_similarity']:.4f}",
f"Raw Distance: {result.debug_info['raw_distance']:.4f}",
f"Memory ID: {result.debug_info['memory_id']}"
]
if result.memory.tags:
memory_info.append(f"Tags: {', '.join(result.memory.tags)}")
memory_info.append("---")
formatted_results.append("\n".join(memory_info))
return [types.TextContent(
type="text",
text="Found the following memories:\n\n" + "\n".join(formatted_results)
)]
except Exception as e:
return [types.TextContent(type="text", text=f"Error in debug retrieve: {str(e)}")]
async def handle_exact_match_retrieve(self, arguments: dict) -> List[types.TextContent]:
content = arguments.get("content")
if not content:
return [types.TextContent(type="text", text="Error: Content is required")]
try:
# Initialize storage lazily when needed
storage = await self._ensure_storage_initialized()
from .utils.debug import exact_match_retrieve
memories = await exact_match_retrieve(storage, content)
if not memories:
return [types.TextContent(type="text", text="No exact matches found")]
formatted_results = []
for i, memory in enumerate(memories):
memory_info = [
f"Memory {i+1}:",
f"Content: {memory.content}",
f"Hash: {memory.content_hash}"
]
if memory.tags:
memory_info.append(f"Tags: {', '.join(memory.tags)}")
memory_info.append("---")
formatted_results.append("\n".join(memory_info))
return [types.TextContent(
type="text",
text="Found the following exact matches:\n\n" + "\n".join(formatted_results)
)]
except Exception as e:
return [types.TextContent(type="text", text=f"Error in exact match retrieve: {str(e)}")]
async def handle_recall_memory(self, arguments: dict) -> List[types.TextContent]:
"""
Handle memory recall requests with natural language time expressions.
This handler parses natural language time expressions from the query,
extracts time ranges, and combines them with optional semantic search.
"""
query = arguments.get("query", "")
n_results = arguments.get("n_results", 5)
if not query:
return [types.TextContent(type="text", text="Error: Query is required")]
try:
# Initialize storage lazily when needed
storage = await self._ensure_storage_initialized()
# Parse natural language time expressions
cleaned_query, (start_timestamp, end_timestamp) = extract_time_expression(query)
# Log the parsed timestamps and clean query
logger.info(f"Original query: {query}")
logger.info(f"Cleaned query for semantic search: {cleaned_query}")
logger.info(f"Parsed time range: {start_timestamp} to {end_timestamp}")
# Log more detailed timestamp information for debugging
if start_timestamp is not None:
start_dt = datetime.fromtimestamp(start_timestamp)
logger.info(f"Start timestamp: {start_timestamp} ({start_dt.strftime('%Y-%m-%d %H:%M:%S')})")
if end_timestamp is not None:
end_dt = datetime.fromtimestamp(end_timestamp)
logger.info(f"End timestamp: {end_timestamp} ({end_dt.strftime('%Y-%m-%d %H:%M:%S')})")
if start_timestamp is None and end_timestamp is None:
# No time expression found, try direct parsing
logger.info("No time expression found in query, trying direct parsing")
start_timestamp, end_timestamp = parse_time_expression(query)
logger.info(f"Direct parse result: {start_timestamp} to {end_timestamp}")
# Format human-readable time range for response
time_range_str = ""
if start_timestamp is not None and end_timestamp is not None:
start_dt = datetime.fromtimestamp(start_timestamp)
end_dt = datetime.fromtimestamp(end_timestamp)
time_range_str = f" from {start_dt.strftime('%Y-%m-%d %H:%M')} to {end_dt.strftime('%Y-%m-%d %H:%M')}"
# Retrieve memories with timestamp filter and optional semantic search
# If cleaned_query is empty or just whitespace after removing time expressions,
# we should perform time-based retrieval only
semantic_query = cleaned_query.strip() if cleaned_query.strip() else None
# Use the enhanced recall method that combines semantic search with time filtering,
# or just time filtering if no semantic query
results = await storage.recall(
query=semantic_query,
n_results=n_results,
start_timestamp=start_timestamp,
end_timestamp=end_timestamp
)
if not results:
no_results_msg = f"No memories found{time_range_str}"
return [types.TextContent(type="text", text=no_results_msg)]
# Format results
formatted_results = []
for i, result in enumerate(results):
memory_dt = result.memory.timestamp
memory_info = [
f"Memory {i+1}:",
]
# Add timestamp if available
if memory_dt:
memory_info.append(f"Timestamp: {memory_dt.strftime('%Y-%m-%d %H:%M:%S')}")
# Add other memory information
memory_info.extend([
f"Content: {result.memory.content}",
f"Hash: {result.memory.content_hash}"
])
# Add relevance score if available (may not be for time-only queries)
if hasattr(result, 'relevance_score') and result.relevance_score is not None:
memory_info.append(f"Relevance Score: {result.relevance_score:.2f}")
# Add tags if available
if result.memory.tags:
memory_info.append(f"Tags: {', '.join(result.memory.tags)}")
memory_info.append("---")
formatted_results.append("\n".join(memory_info))
# Include time range in response if available
found_msg = f"Found {len(results)} memories{time_range_str}:"
return [types.TextContent(
type="text",
text=f"{found_msg}\n\n" + "\n".join(formatted_results)
)]
except Exception as e:
logger.error(f"Error in recall_memory: {str(e)}\n{traceback.format_exc()}")
return [types.TextContent(type="text", text=f"Error recalling memories: {str(e)}")]
async def handle_check_database_health(self, arguments: dict) -> List[types.TextContent]:
"""Handle database health check requests with performance metrics."""
logger.info("=== EXECUTING CHECK_DATABASE_HEALTH ===")
try:
# Initialize storage lazily when needed
try:
storage = await self._ensure_storage_initialized()
except Exception as init_error:
# Storage initialization failed
result = {
"validation": {
"status": "unhealthy",
"message": f"Storage initialization failed: {str(init_error)}"
},
"statistics": {
"status": "error",
"error": "Cannot get statistics - storage not initialized"
},
"performance": {
"storage": {},
"server": {
"average_query_time_ms": self.get_average_query_time(),
"total_queries": len(self.query_times)
}
}
}
logger.error(f"Storage initialization failed during health check: {str(init_error)}")
return [types.TextContent(
type="text",
text=f"Database Health Check Results:\n{json.dumps(result, indent=2)}"
)]
# Skip db_utils completely for health check - implement directly here
# Get storage type for backend-specific handling
storage_type = storage.__class__.__name__
# Direct health check implementation based on storage type
is_valid = False
message = ""
stats = {}
if storage_type == "SqliteVecMemoryStorage":
# Direct SQLite-vec validation
if not hasattr(storage, 'conn') or storage.conn is None:
is_valid = False
message = "SQLite database connection is not initialized"
else:
try:
# Check for required tables
cursor = storage.conn.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='memories'")
if not cursor.fetchone():
is_valid = False
message = "SQLite database is missing required tables"
else:
# Count memories
cursor = storage.conn.execute('SELECT COUNT(*) FROM memories')
memory_count = cursor.fetchone()[0]
# Check if embedding tables exist
cursor = storage.conn.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='memory_embeddings'")
has_embeddings = cursor.fetchone() is not None
# Check embedding model
has_model = hasattr(storage, 'embedding_model') and storage.embedding_model is not None
# Collect stats
stats = {
"status": "healthy",
"backend": "sqlite-vec",
"total_memories": memory_count,
"has_embedding_tables": has_embeddings,
"has_embedding_model": has_model,
"embedding_model": storage.embedding_model_name if hasattr(storage, 'embedding_model_name') else "none"
}
# Get database file size
db_path = storage.db_path if hasattr(storage, 'db_path') else None
if db_path and os.path.exists(db_path):
file_size = os.path.getsize(db_path)
stats["database_size_bytes"] = file_size
stats["database_size_mb"] = round(file_size / (1024 * 1024), 2)
is_valid = True
message = "SQLite-vec database validation successful"
except Exception as e:
is_valid = False
message = f"SQLite database validation error: {str(e)}"
stats = {
"status": "error",
"error": str(e),
"backend": "sqlite-vec"
}
elif storage_type == "CloudflareStorage":
# Cloudflare storage validation
try:
# Check if storage is properly initialized
if not hasattr(storage, 'client') or storage.client is None:
is_valid = False
message = "Cloudflare storage client is not initialized"
stats = {
"status": "error",
"error": "Cloudflare storage client is not initialized",
"backend": "cloudflare"
}
else:
# Get storage stats
storage_stats = await storage.get_stats()
# Collect basic health info
stats = {
"status": "healthy",
"backend": "cloudflare",
"total_memories": storage_stats.get("total_memories", 0),
"vectorize_index": storage.vectorize_index,
"d1_database_id": storage.d1_database_id,
"r2_bucket": storage.r2_bucket,
"embedding_model": storage.embedding_model
}
# Add additional stats if available
stats.update(storage_stats)
is_valid = True
message = "Cloudflare storage validation successful"
except Exception as e:
is_valid = False
message = f"Cloudflare storage validation error: {str(e)}"
stats = {
"status": "error",
"error": str(e),
"backend": "cloudflare"
}
elif storage_type == "HybridMemoryStorage":
# Hybrid storage validation (SQLite-vec primary + Cloudflare secondary)
try:
if not hasattr(storage, 'primary') or storage.primary is None:
is_valid = False
message = "Hybrid storage primary backend is not initialized"
stats = {
"status": "error",
"error": "Hybrid storage primary backend is not initialized",
"backend": "hybrid"
}
else:
primary_storage = storage.primary
# Validate primary storage (SQLite-vec)
if not hasattr(primary_storage, 'conn') or primary_storage.conn is None:
is_valid = False
message = "Hybrid storage: SQLite connection is not initialized"
stats = {
"status": "error",
"error": "SQLite connection is not initialized",
"backend": "hybrid"
}
else:
# Check for required tables
cursor = primary_storage.conn.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='memories'")
if not cursor.fetchone():
is_valid = False
message = "Hybrid storage: SQLite database is missing required tables"
stats = {
"status": "error",
"error": "SQLite database is missing required tables",
"backend": "hybrid"
}
else:
# Count memories
cursor = primary_storage.conn.execute('SELECT COUNT(*) FROM memories')
memory_count = cursor.fetchone()[0]
# Check if embedding tables exist
cursor = primary_storage.conn.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='memory_embeddings'")
has_embeddings = cursor.fetchone() is not None
# Check secondary (Cloudflare) status
cloudflare_status = "not_configured"
if hasattr(storage, 'secondary') and storage.secondary:
sync_service = getattr(storage, 'sync_service', None)
if sync_service and getattr(sync_service, 'is_running', False):
cloudflare_status = "syncing"
else:
cloudflare_status = "configured"
# Collect stats
stats = {
"status": "healthy",
"backend": "hybrid",
"total_memories": memory_count,
"has_embeddings": has_embeddings,
"database_path": getattr(primary_storage, 'db_path', SQLITE_VEC_PATH),
"cloudflare_sync": cloudflare_status
}
is_valid = True
message = f"Hybrid storage validation successful ({memory_count} memories, Cloudflare: {cloudflare_status})"
except Exception as e:
is_valid = False
message = f"Hybrid storage validation error: {str(e)}"
stats = {
"status": "error",
"error": str(e),
"backend": "hybrid"
}
else:
is_valid = False
message = f"Unknown storage type: {storage_type}"
stats = {
"status": "error",
"error": f"Unknown storage type: {storage_type}",
"backend": "unknown"
}
# Get performance stats from optimized storage
performance_stats = {}
if hasattr(storage, 'get_performance_stats') and callable(storage.get_performance_stats):
try:
performance_stats = storage.get_performance_stats()
except Exception as perf_error:
logger.warning(f"Could not get performance stats: {str(perf_error)}")
performance_stats = {"error": str(perf_error)}
# Get server-level performance stats
server_stats = {
"average_query_time_ms": self.get_average_query_time(),
"total_queries": len(self.query_times)
}
# Add storage type for debugging
server_stats["storage_type"] = storage_type
# Add storage initialization status for debugging
if hasattr(storage, 'get_initialization_status') and callable(storage.get_initialization_status):
try:
server_stats["storage_initialization"] = storage.get_initialization_status()
except Exception:
pass
# Combine results with performance data
result = {
"version": __version__,
"validation": {
"status": "healthy" if is_valid else "unhealthy",
"message": message
},
"statistics": stats,
"performance": {
"storage": performance_stats,
"server": server_stats
}
}
logger.info(f"Database health result with performance data: {result}")
return [types.TextContent(
type="text",
text=f"Database Health Check Results:\n{json.dumps(result, indent=2)}"
)]
except Exception as e:
logger.error(f"Error in check_database_health: {str(e)}")
logger.error(traceback.format_exc())
return [types.TextContent(
type="text",
text=f"Error checking database health: {str(e)}"
)]
async def handle_recall_by_timeframe(self, arguments: dict) -> List[types.TextContent]:
"""Handle recall by timeframe requests."""
from datetime import datetime
try:
# Initialize storage lazily when needed
storage = await self._ensure_storage_initialized()
start_date = datetime.fromisoformat(arguments["start_date"]).date()
end_date = datetime.fromisoformat(arguments.get("end_date", arguments["start_date"])).date()
n_results = arguments.get("n_results", 5)
# Get timestamp range
start_timestamp = datetime(start_date.year, start_date.month, start_date.day).timestamp()
end_timestamp = datetime(end_date.year, end_date.month, end_date.day, 23, 59, 59).timestamp()
# Log the timestamp values for debugging
logger.info(f"Recall by timeframe: {start_date} to {end_date}")
logger.info(f"Start timestamp: {start_timestamp} ({datetime.fromtimestamp(start_timestamp).strftime('%Y-%m-%d %H:%M:%S')})")
logger.info(f"End timestamp: {end_timestamp} ({datetime.fromtimestamp(end_timestamp).strftime('%Y-%m-%d %H:%M:%S')})")
# Retrieve memories with proper parameters - query is None because this is pure time-based filtering
results = await storage.recall(
query=None,
n_results=n_results,
start_timestamp=start_timestamp,
end_timestamp=end_timestamp
)
if not results:
return [types.TextContent(type="text", text=f"No memories found from {start_date} to {end_date}")]
formatted_results = []
for i, result in enumerate(results):
memory_timestamp = result.memory.timestamp
memory_info = [
f"Memory {i+1}:",
]
# Add timestamp if available
if memory_timestamp:
memory_info.append(f"Timestamp: {memory_timestamp.strftime('%Y-%m-%d %H:%M:%S')}")
memory_info.extend([
f"Content: {result.memory.content}",
f"Hash: {result.memory.content_hash}"
])
if result.memory.tags:
memory_info.append(f"Tags: {', '.join(result.memory.tags)}")
memory_info.append("---")
formatted_results.append("\n".join(memory_info))
return [types.TextContent(
type="text",
text=f"Found {len(results)} memories from {start_date} to {end_date}:\n\n" + "\n".join(formatted_results)
)]
except Exception as e:
logger.error(f"Error in recall_by_timeframe: {str(e)}\n{traceback.format_exc()}")
return [types.TextContent(
type="text",
text=f"Error recalling memories: {str(e)}"
)]
async def handle_delete_by_timeframe(self, arguments: dict) -> List[types.TextContent]:
"""Handle delete by timeframe requests."""
from datetime import datetime
try:
# Initialize storage lazily when needed
storage = await self._ensure_storage_initialized()
start_date = datetime.fromisoformat(arguments["start_date"]).date()
end_date = datetime.fromisoformat(arguments.get("end_date", arguments["start_date"])).date()
tag = arguments.get("tag")
count, message = await storage.delete_by_timeframe(start_date, end_date, tag)
return [types.TextContent(
type="text",
text=f"Deleted {count} memories: {message}"
)]
except Exception as e:
return [types.TextContent(
type="text",
text=f"Error deleting memories: {str(e)}"
)]
async def handle_delete_before_date(self, arguments: dict) -> List[types.TextContent]:
"""Handle delete before date requests."""
from datetime import datetime
try:
# Initialize storage lazily when needed
storage = await self._ensure_storage_initialized()
before_date = datetime.fromisoformat(arguments["before_date"]).date()
tag = arguments.get("tag")
count, message = await storage.delete_before_date(before_date, tag)
return [types.TextContent(
type="text",
text=f"Deleted {count} memories: {message}"
)]
except Exception as e:
return [types.TextContent(
type="text",
text=f"Error deleting memories: {str(e)}"
)]
async def handle_ingest_document(self, arguments: dict) -> List[types.TextContent]:
"""Handle document ingestion requests."""
try:
from pathlib import Path
from .ingestion import get_loader_for_file
from .models.memory import Memory
from .utils import generate_content_hash
import time
# Initialize storage lazily when needed
storage = await self._ensure_storage_initialized()
file_path = Path(arguments["file_path"])
tags = arguments.get("tags", [])
chunk_size = arguments.get("chunk_size", 1000)
chunk_overlap = arguments.get("chunk_overlap", 200)
memory_type = arguments.get("memory_type", "document")
logger.info(f"Starting document ingestion: {file_path}")
start_time = time.time()
# Validate file exists and get appropriate document loader
if not file_path.exists():
return [types.TextContent(
type="text",
text=f"Error: File not found: {file_path.resolve()}"
)]
loader = get_loader_for_file(file_path)
if loader is None:
from .ingestion import SUPPORTED_FORMATS
supported_exts = ", ".join(f".{ext}" for ext in SUPPORTED_FORMATS.keys())
return [types.TextContent(
type="text",
text=f"Error: Unsupported file format: {file_path.suffix}. Supported formats: {supported_exts}"
)]
# Configure loader
loader.chunk_size = chunk_size
loader.chunk_overlap = chunk_overlap
chunks_processed = 0
chunks_stored = 0
errors = []
# Extract and store chunks
async for chunk in loader.extract_chunks(file_path):
chunks_processed += 1
try:
# Combine document tags with chunk metadata tags
all_tags = tags.copy()
if chunk.metadata.get('tags'):
all_tags.extend(chunk.metadata['tags'])
# Create memory object
memory = Memory(
content=chunk.content,
content_hash=generate_content_hash(chunk.content, chunk.metadata),
tags=list(set(all_tags)), # Remove duplicates
memory_type=memory_type,
metadata=chunk.metadata
)
# Store the memory
success, error = await storage.store(memory)
if success:
chunks_stored += 1
else:
errors.append(f"Chunk {chunk.chunk_index}: {error}")
except Exception as e:
errors.append(f"Chunk {chunk.chunk_index}: {str(e)}")
processing_time = time.time() - start_time
success_rate = (chunks_stored / chunks_processed * 100) if chunks_processed > 0 else 0
# Prepare result message
result_lines = [
f"✅ Document ingestion completed: {file_path.name}",
f"📄 Chunks processed: {chunks_processed}",
f"💾 Chunks stored: {chunks_stored}",
f"⚡ Success rate: {success_rate:.1f}%",
f"⏱️ Processing time: {processing_time:.2f} seconds"
]
if errors:
result_lines.append(f"⚠️ Errors encountered: {len(errors)}")
if len(errors) <= 5: # Show first few errors
result_lines.extend([f" - {error}" for error in errors[:5]])
else:
result_lines.extend([f" - {error}" for error in errors[:3]])
result_lines.append(f" ... and {len(errors) - 3} more errors")
logger.info(f"Document ingestion completed: {chunks_stored}/{chunks_processed} chunks stored")
return [types.TextContent(type="text", text="\n".join(result_lines))]
except Exception as e:
logger.error(f"Error in document ingestion: {str(e)}")
return [types.TextContent(
type="text",
text=f"Error ingesting document: {str(e)}"
)]
async def handle_ingest_directory(self, arguments: dict) -> List[types.TextContent]:
"""Handle directory ingestion requests."""
try:
from pathlib import Path
from .ingestion import get_loader_for_file, is_supported_file
from .models.memory import Memory
from .utils import generate_content_hash
import time
# Initialize storage lazily when needed
storage = await self._ensure_storage_initialized()
directory_path = Path(arguments["directory_path"])
tags = arguments.get("tags", [])
recursive = arguments.get("recursive", True)
file_extensions = arguments.get("file_extensions", ["pdf", "txt", "md", "json"])
chunk_size = arguments.get("chunk_size", 1000)
max_files = arguments.get("max_files", 100)
if not directory_path.exists() or not directory_path.is_dir():
return [types.TextContent(
type="text",
text=f"Error: Directory not found: {directory_path}"
)]
logger.info(f"Starting directory ingestion: {directory_path}")
start_time = time.time()
# Find all supported files
pattern = "**/*" if recursive else "*"
all_files = []
for ext in file_extensions:
ext_pattern = f"*.{ext.lstrip('.')}"
if recursive:
files = list(directory_path.rglob(ext_pattern))
else:
files = list(directory_path.glob(ext_pattern))
all_files.extend(files)
# Remove duplicates and filter supported files
unique_files = []
seen = set()
for file_path in all_files:
if file_path not in seen and is_supported_file(file_path):
unique_files.append(file_path)
seen.add(file_path)
# Limit number of files
files_to_process = unique_files[:max_files]
if not files_to_process:
return [types.TextContent(
type="text",
text=f"No supported files found in directory: {directory_path}"
)]
total_chunks_processed = 0
total_chunks_stored = 0
files_processed = 0
files_failed = 0
all_errors = []
# Process each file
for file_path in files_to_process:
try:
logger.info(f"Processing file {files_processed + 1}/{len(files_to_process)}: {file_path.name}")
# Get appropriate document loader
loader = get_loader_for_file(file_path)
if loader is None:
all_errors.append(f"{file_path.name}: Unsupported format")
files_failed += 1
continue
# Configure loader
loader.chunk_size = chunk_size
file_chunks_processed = 0
file_chunks_stored = 0
# Extract and store chunks from this file
async for chunk in loader.extract_chunks(file_path):
file_chunks_processed += 1
total_chunks_processed += 1
try:
# Add directory-level tags and file-specific tags
all_tags = tags.copy()
all_tags.append(f"source_dir:{directory_path.name}")
all_tags.append(f"file_type:{file_path.suffix.lstrip('.')}")
if chunk.metadata.get('tags'):
all_tags.extend(chunk.metadata['tags'])
# Create memory object
memory = Memory(
content=chunk.content,
content_hash=generate_content_hash(chunk.content, chunk.metadata),
tags=list(set(all_tags)), # Remove duplicates
memory_type="document",
metadata=chunk.metadata
)
# Store the memory
success, error = await storage.store(memory)
if success:
file_chunks_stored += 1
total_chunks_stored += 1
else:
all_errors.append(f"{file_path.name} chunk {chunk.chunk_index}: {error}")
except Exception as e:
all_errors.append(f"{file_path.name} chunk {chunk.chunk_index}: {str(e)}")
if file_chunks_stored > 0:
files_processed += 1
else:
files_failed += 1
except Exception as e:
files_failed += 1
all_errors.append(f"{file_path.name}: {str(e)}")
processing_time = time.time() - start_time
success_rate = (total_chunks_stored / total_chunks_processed * 100) if total_chunks_processed > 0 else 0
# Prepare result message
result_lines = [
f"✅ Directory ingestion completed: {directory_path.name}",
f"📁 Files processed: {files_processed}/{len(files_to_process)}",
f"📄 Total chunks processed: {total_chunks_processed}",
f"💾 Total chunks stored: {total_chunks_stored}",
f"⚡ Success rate: {success_rate:.1f}%",
f"⏱️ Processing time: {processing_time:.2f} seconds"
]
if files_failed > 0:
result_lines.append(f"❌ Files failed: {files_failed}")
if all_errors:
result_lines.append(f"⚠️ Total errors: {len(all_errors)}")
# Show first few errors
error_limit = 5
for error in all_errors[:error_limit]:
result_lines.append(f" - {error}")
if len(all_errors) > error_limit:
result_lines.append(f" ... and {len(all_errors) - error_limit} more errors")
logger.info(f"Directory ingestion completed: {total_chunks_stored}/{total_chunks_processed} chunks from {files_processed} files")
return [types.TextContent(type="text", text="\n".join(result_lines))]
except Exception as e:
logger.error(f"Error in directory ingestion: {str(e)}")
return [types.TextContent(
type="text",
text=f"Error ingesting directory: {str(e)}"
)]
async def async_main():
# Apply LM Studio compatibility patch before anything else
patch_mcp_for_lm_studio()
# Add Windows-specific timeout handling
add_windows_timeout_handling()
# Run dependency check before starting
run_dependency_check()
# Check if running with UV
check_uv_environment()
# Debug logging is now handled by the CLI layer
# Print system diagnostics only for LM Studio (avoid JSON parsing errors in Claude Desktop)
system_info = get_system_info()
if MCP_CLIENT == 'lm_studio':
print("\n=== MCP Memory Service System Diagnostics ===", file=sys.stdout, flush=True)
print(f"OS: {system_info.os_name} {system_info.architecture}", file=sys.stdout, flush=True)
print(f"Python: {platform.python_version()}", file=sys.stdout, flush=True)
print(f"Hardware Acceleration: {system_info.accelerator}", file=sys.stdout, flush=True)
print(f"Memory: {system_info.memory_gb:.2f} GB", file=sys.stdout, flush=True)
print(f"Optimal Model: {system_info.get_optimal_model()}", file=sys.stdout, flush=True)
print(f"Optimal Batch Size: {system_info.get_optimal_batch_size()}", file=sys.stdout, flush=True)
print(f"Storage Backend: {STORAGE_BACKEND}", file=sys.stdout, flush=True)
print("================================================\n", file=sys.stdout, flush=True)
logger.info(f"Starting MCP Memory Service with storage backend: {STORAGE_BACKEND}")
try:
# Create server instance with hardware-aware configuration
memory_server = MemoryServer()
# Set up async initialization with timeout and retry logic
max_retries = 2
retry_count = 0
init_success = False
while retry_count <= max_retries and not init_success:
if retry_count > 0:
logger.warning(f"Retrying initialization (attempt {retry_count}/{max_retries})...")
init_task = asyncio.create_task(memory_server.initialize())
try:
# 30 second timeout for initialization
init_success = await asyncio.wait_for(init_task, timeout=30.0)
if init_success:
logger.info("Async initialization completed successfully")
else:
logger.warning("Initialization returned failure status")
retry_count += 1
except asyncio.TimeoutError:
logger.warning("Async initialization timed out. Continuing with server startup.")
# Don't cancel the task, let it complete in the background
break
except Exception as init_error:
logger.error(f"Initialization error: {str(init_error)}")
logger.error(traceback.format_exc())
retry_count += 1
if retry_count <= max_retries:
logger.info(f"Waiting 2 seconds before retry...")
await asyncio.sleep(2)
# Check if running in standalone mode (Docker without active client)
standalone_mode = os.environ.get('MCP_STANDALONE_MODE', '').lower() == '1'
running_in_docker = os.path.exists('/.dockerenv') or os.environ.get('DOCKER_CONTAINER', False)
if standalone_mode:
logger.info("Running in standalone mode - keeping server alive without active client")
if MCP_CLIENT == 'lm_studio':
print("MCP Memory Service running in standalone mode", file=sys.stdout, flush=True)
# Keep the server running indefinitely
try:
while True:
await asyncio.sleep(60) # Sleep for 60 seconds at a time
logger.debug("Standalone server heartbeat")
except asyncio.CancelledError:
logger.info("Standalone server cancelled")
raise
else:
# Start the server with stdio
async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
logger.info("Server started and ready to handle requests")
if running_in_docker:
logger.info("Detected Docker environment - ensuring proper stdio handling")
if MCP_CLIENT == 'lm_studio':
print("MCP Memory Service running in Docker container", file=sys.stdout, flush=True)
try:
await memory_server.server.run(
read_stream,
write_stream,
InitializationOptions(
server_name=SERVER_NAME,
server_version=SERVER_VERSION,
# Explicitly specify the protocol version that matches Claude's request
# Use the latest protocol version to ensure compatibility with all clients
protocol_version="2024-11-05",
capabilities=memory_server.server.get_capabilities(
notification_options=NotificationOptions(),
experimental_capabilities={
"hardware_info": {
"architecture": system_info.architecture,
"accelerator": system_info.accelerator,
"memory_gb": system_info.memory_gb,
"cpu_count": system_info.cpu_count
}
},
),
),
)
except asyncio.CancelledError:
logger.info("Server run cancelled")
raise
except BaseException as e:
# Handle ExceptionGroup specially (Python 3.11+)
if type(e).__name__ == 'ExceptionGroup' or 'ExceptionGroup' in str(type(e)):
error_str = str(e)
# Check if this contains the LM Studio cancelled notification error
if 'notifications/cancelled' in error_str or 'ValidationError' in error_str:
logger.info("LM Studio sent a cancelled notification - this is expected behavior")
logger.debug(f"Full error for debugging: {error_str}")
# Don't re-raise - just continue gracefully
else:
logger.error(f"ExceptionGroup in server.run: {str(e)}")
logger.error(traceback.format_exc())
raise
else:
logger.error(f"Error in server.run: {str(e)}")
logger.error(traceback.format_exc())
raise
finally:
logger.info("Server run completed")
except Exception as e:
logger.error(f"Server error: {str(e)}")
logger.error(traceback.format_exc())
print(f"Fatal server error: {str(e)}", file=sys.stderr, flush=True)
raise
def main():
import signal
# Set up signal handlers for graceful shutdown
def signal_handler(signum, frame):
logger.info(f"Received signal {signum}, shutting down gracefully...")
sys.exit(0)
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
try:
# Check if running in Docker
if os.path.exists('/.dockerenv') or os.environ.get('DOCKER_CONTAINER', False):
logger.info("Running in Docker container")
if MCP_CLIENT == 'lm_studio':
print("MCP Memory Service starting in Docker mode", file=sys.stdout, flush=True)
asyncio.run(async_main())
except KeyboardInterrupt:
logger.info("Shutting down gracefully...")
except Exception as e:
logger.error(f"Fatal error: {str(e)}\n{traceback.format_exc()}")
sys.exit(1)
if __name__ == "__main__":
main()