Skip to main content
Glama
v2-agentdb-integration-spec.md73.1 kB
# MCP-Standards v2.0: AgentDB Integration Architecture Specification **Version**: 2.0.0 **Date**: 2025-10-20 **Status**: Technical Design - Ready for Implementation **Author**: System Architecture Designer --- ## Executive Summary This document provides the complete technical architecture for integrating AgentDB ultra-fast vector memory into mcp-standards v2.0. The integration transforms the system from a reactive SQLite-based pattern logger into an intelligent, semantic-aware context management system with <10ms startup and <1ms search performance. **Key Objectives**: - 50-100x performance improvement via HNSW vector search - Semantic pattern matching (reduce corrections from 3-5 to 1-2) - Event-driven CLAUDE.md auto-updates (zero manual triggers) - Hybrid architecture (AgentDB hot path + SQLite cold path) - Backward compatibility with v1 data **Performance Targets**: | Metric | v1 Current | v2 Target | Improvement | |--------|-----------|-----------|-------------| | Startup Time | 500ms | <10ms | 50x faster | | Pattern Search | 50ms+ | <1ms | 50x+ faster | | Corrections to Learn | 3-5 | 1-2 | 60-70% reduction | | Context Tokens | 23,000 | 5,000 | 78% reduction | --- ## Table of Contents 1. [System Architecture](#1-system-architecture) 2. [Hybrid Memory Architecture](#2-hybrid-memory-architecture) 3. [Component Integration](#3-component-integration) 4. [Data Migration Strategy](#4-data-migration-strategy) 5. [API Design](#5-api-design) 6. [Performance Optimization](#6-performance-optimization) 7. [Implementation Phases](#7-implementation-phases) 8. [Testing Strategy](#8-testing-strategy) --- ## 1. System Architecture ### 1.1 High-Level Architecture Diagram ``` ┌─────────────────────────────────────────────────────────────────┐ │ Claude Desktop/Code │ │ │ │ ┌───────────────────────────────────────────────────────────┐ │ │ │ MCP-Standards v2 Server │ │ │ │ │ │ │ │ ┌──────────────────────────────────────────────────┐ │ │ │ │ │ Memory Router (Query Dispatcher) │ │ │ │ │ │ │ │ │ │ │ │ • Semantic queries → AgentDB │ │ │ │ │ │ • Audit queries → SQLite │ │ │ │ │ │ • Hybrid queries → Both (merge results) │ │ │ │ │ └─────────────┬────────────────┬───────────────────┘ │ │ │ │ │ │ │ │ │ │ ┌──────────▼─────────┐ ┌──▼────────────────┐ │ │ │ │ │ AgentDB Layer │ │ SQLite Layer │ │ │ │ │ │ (Hot Path) │ │ (Cold Path) │ │ │ │ │ │ │ │ │ │ │ │ │ │ • HNSW Graph │ │ • Audit Logs │ │ │ │ │ │ • Vector Store │ │ • Metadata │ │ │ │ │ │ • Pattern Clusters │ │ • Full History │ │ │ │ │ │ • <10ms startup │ │ • Compliance │ │ │ │ │ │ • <1ms search │ │ • Temporal Graph │ │ │ │ │ └────────────────────┘ └───────────────────┘ │ │ │ │ │ │ │ │ ┌──────────────────────────────────────────────────┐ │ │ │ │ │ Intelligence Layer (Enhanced) │ │ │ │ │ │ │ │ │ │ │ │ ┌─────────────────┐ ┌──────────────────────┐ │ │ │ │ │ │ │ Pattern │ │ ReasoningBank │ │ │ │ │ │ │ │ Extractor │ │ (Outcome Tracker) │ │ │ │ │ │ │ │ (Semantic) │ │ │ │ │ │ │ │ │ └─────────────────┘ └──────────────────────┘ │ │ │ │ │ │ │ │ │ │ │ │ ┌─────────────────┐ ┌──────────────────────┐ │ │ │ │ │ │ │ CLAUDE.md │ │ Event Bus │ │ │ │ │ │ │ │ Manager │ │ (Event-Driven) │ │ │ │ │ │ │ │ (Auto-Update) │ │ │ │ │ │ │ │ │ └─────────────────┘ └──────────────────────┘ │ │ │ │ │ └──────────────────────────────────────────────────┘ │ │ │ │ │ │ │ │ ┌──────────────────────────────────────────────────┐ │ │ │ │ │ Event-Driven Automation │ │ │ │ │ │ │ │ │ │ │ │ • Config File Watcher (inotify/FSEvents) │ │ │ │ │ │ • Pattern Promotion Monitor (background job) │ │ │ │ │ │ • Proactive Suggester (MCP notifications) │ │ │ │ │ │ • Diff-Based Learner (CLAUDE.md edits) │ │ │ │ │ └──────────────────────────────────────────────────┘ │ │ │ └────────────────────────────────────────────────────────────┘ │ └─────────────────────────────────────────────────────────────────┘ ``` ### 1.2 Component Responsibilities | Component | Responsibility | Performance Target | |-----------|---------------|-------------------| | **Memory Router** | Query dispatch, result merging | <100μs routing overhead | | **AgentDB Layer** | Vector search, semantic matching, pattern clustering | <1ms search, <10ms startup | | **SQLite Layer** | Audit logs, metadata, compliance, full history | 50ms acceptable (cold path) | | **Pattern Extractor** | Extract patterns with semantic embeddings | <5ms per pattern | | **ReasoningBank** | Track outcomes, adjust confidence | Async background | | **CLAUDE.md Manager** | Auto-generate/update config files | Event-driven (on-demand) | | **Event Bus** | Event routing, notification dispatch | <10ms event delivery | | **File Watcher** | Monitor config file changes | Real-time (inotify) | --- ## 2. Hybrid Memory Architecture ### 2.1 Memory Router Design The Memory Router is the core abstraction that intelligently routes queries to either AgentDB (hot path) or SQLite (cold path) based on query type. ```python # src/mcp_standards/memory/router.py from typing import Any, Dict, List, Optional, Union from enum import Enum import asyncio from pathlib import Path class QueryType(Enum): """Query type classification""" SEMANTIC = "semantic" # Vector similarity search EXACT = "exact" # Key-value lookup AUDIT = "audit" # Audit trail query TEMPORAL = "temporal" # Temporal knowledge graph HYBRID = "hybrid" # Requires both stores class MemoryRouter: """ Intelligent query router for hybrid AgentDB + SQLite architecture. Routes queries to optimal storage backend: - Semantic queries → AgentDB (HNSW vector search) - Audit/compliance → SQLite (full history) - Hybrid queries → Both (merge results) """ def __init__( self, agentdb_path: Path, sqlite_path: Path, embedding_model: str = "all-MiniLM-L6-v2" ): from .agentdb_adapter import AgentDBAdapter from .sqlite_adapter import SQLiteAdapter # Initialize storage backends self.agentdb = AgentDBAdapter( db_path=agentdb_path, embedding_model=embedding_model, dimension=384 ) self.sqlite = SQLiteAdapter(db_path=sqlite_path) # Sync layer keeps both stores updated self._sync_enabled = True async def store_pattern( self, pattern_key: str, pattern_data: Dict[str, Any], metadata: Optional[Dict] = None, confidence: float = 0.1, namespace: str = "patterns" ) -> bool: """ Store pattern in both AgentDB (vector) and SQLite (metadata). AgentDB: Embedding for semantic search SQLite: Full metadata, audit trail, temporal tracking """ # Store in AgentDB with vector embedding agentdb_success = await self.agentdb.store( key=pattern_key, value=pattern_data, namespace=namespace, metadata={ **metadata, "confidence": confidence, "stored_at": "agentdb" } ) # Store metadata in SQLite sqlite_success = await self.sqlite.store_metadata( pattern_key=pattern_key, pattern_data=pattern_data, metadata=metadata, confidence=confidence ) return agentdb_success and sqlite_success async def search_patterns( self, query: str, query_type: QueryType = QueryType.SEMANTIC, top_k: int = 5, namespace: Optional[str] = None, filters: Optional[Dict] = None, min_confidence: float = 0.7 ) -> List[Dict[str, Any]]: """ Search patterns using query type-specific routing. SEMANTIC: AgentDB vector search EXACT: SQLite key-value lookup AUDIT: SQLite audit trail HYBRID: Merge results from both """ if query_type == QueryType.SEMANTIC: # Route to AgentDB for vector similarity search return await self.agentdb.search( query=query, top_k=top_k, namespace=namespace, threshold=min_confidence ) elif query_type == QueryType.EXACT: # Route to SQLite for exact key lookup return await self.sqlite.retrieve( key=query, namespace=namespace, filters=filters ) elif query_type == QueryType.AUDIT: # Route to SQLite audit log return await self.sqlite.query_audit_log( filters=filters, limit=top_k ) elif query_type == QueryType.TEMPORAL: # Route to SQLite temporal graph return await self.sqlite.query_temporal_graph( pattern_key=query, filters=filters ) elif query_type == QueryType.HYBRID: # Query both stores in parallel and merge semantic_results, metadata_results = await asyncio.gather( self.agentdb.search(query, top_k, namespace, min_confidence), self.sqlite.retrieve(query, namespace, filters) ) return self._merge_results(semantic_results, metadata_results) else: raise ValueError(f"Unknown query type: {query_type}") def _merge_results( self, semantic_results: List[Dict], metadata_results: List[Dict] ) -> List[Dict]: """ Merge results from AgentDB and SQLite. Strategy: 1. Use semantic results as base (sorted by similarity) 2. Enrich with metadata from SQLite 3. Handle missing entries gracefully """ merged = [] # Create lookup map for metadata metadata_map = { result['key']: result for result in metadata_results } for sem_result in semantic_results: key = sem_result['key'] # Enrich with metadata if available if key in metadata_map: merged_entry = { **sem_result, # Vector search results (similarity, value) **metadata_map[key], # SQLite metadata (full history, audit) 'sources': ['agentdb', 'sqlite'] } else: merged_entry = { **sem_result, 'sources': ['agentdb'] } merged.append(merged_entry) return merged async def get_stats(self) -> Dict[str, Any]: """Get statistics from both storage layers.""" agentdb_stats, sqlite_stats = await asyncio.gather( self.agentdb.get_stats(), self.sqlite.get_stats() ) return { 'agentdb': agentdb_stats, 'sqlite': sqlite_stats, 'router': { 'sync_enabled': self._sync_enabled, 'query_types': [qt.value for qt in QueryType] } } ``` ### 2.2 AgentDB Adapter ```python # src/mcp_standards/memory/agentdb_adapter.py import asyncio import numpy as np from typing import Any, Dict, List, Optional from pathlib import Path import logging logger = logging.getLogger(__name__) class AgentDBAdapter: """ Adapter for AgentDB ultra-fast vector memory. Features: - HNSW graph for 116x faster similarity search - <10ms startup time (disk mode) - <1ms search queries - Automatic embedding generation """ def __init__( self, db_path: Path, embedding_model: str = "all-MiniLM-L6-v2", dimension: int = 384, max_elements: int = 100_000 ): self.db_path = db_path self.dimension = dimension # Use existing EmbeddingManager from v1 from ...intelligence.memory.embeddings import EmbeddingManager self.embedder = EmbeddingManager(model_name=embedding_model) # Initialize AgentDB (note: this is pseudocode, actual API may differ) # AgentDB is a separate package installed via npm/npx # We'll interact with it via subprocess or native Python bindings self._init_agentdb(max_elements) logger.info(f"AgentDB initialized at {db_path} (dim={dimension})") def _init_agentdb(self, max_elements: int): """ Initialize AgentDB instance. AgentDB Setup Options: 1. Use npx agentdb (zero-install mode) 2. Use native Python bindings (if available) 3. Use HTTP API (for remote AgentDB) For v2, we'll use approach #2 with fallback to #1 """ try: # Option 1: Try to import native Python bindings # (This is aspirational - may need to implement via subprocess) import agentdb self.db = agentdb.VectorStore( path=str(self.db_path), dimension=self.dimension, max_elements=max_elements, ef_construction=200, # HNSW build-time parameter M=16, # HNSW connections per element mode='disk' # or 'memory' for <10ms startup ) logger.info("Using native AgentDB bindings") except ImportError: # Option 2: Fallback to subprocess wrapper logger.warning("Native AgentDB bindings not available, using subprocess") self.db = AgentDBSubprocessWrapper( db_path=self.db_path, dimension=self.dimension, max_elements=max_elements ) async def store( self, key: str, value: Any, namespace: str = "global", metadata: Optional[Dict] = None ) -> bool: """ Store value with automatic embedding generation. Returns: Success status """ try: # Generate embedding embedding = await self._generate_embedding(value) # Store in AgentDB with metadata await self.db.add( id=f"{namespace}:{key}", vector=embedding.tolist(), metadata={ 'key': key, 'namespace': namespace, 'value': self._serialize_value(value), **(metadata or {}) } ) return True except Exception as e: logger.error(f"AgentDB store failed: {e}") return False async def search( self, query: str, top_k: int = 5, namespace: Optional[str] = None, threshold: float = 0.7 ) -> List[Dict[str, Any]]: """ Semantic search using HNSW vector similarity. Performance target: <1ms """ try: # Generate query embedding query_embedding = await self._generate_embedding(query) # Search AgentDB HNSW graph results = await self.db.search( query_vector=query_embedding.tolist(), k=top_k, filter={"namespace": namespace} if namespace else None ) # Transform results transformed = [] for result in results: if result['score'] >= threshold: transformed.append({ 'key': result['metadata']['key'], 'namespace': result['metadata']['namespace'], 'value': self._deserialize_value( result['metadata']['value'] ), 'similarity': float(result['score']), 'metadata': result['metadata'] }) return transformed except Exception as e: logger.error(f"AgentDB search failed: {e}") return [] async def _generate_embedding(self, value: Any) -> np.ndarray: """Generate embedding for a value.""" # Convert value to text if isinstance(value, dict): text = self._extract_text_from_dict(value) else: text = str(value) # Use existing embedder from v1 embedding = self.embedder.encode(text, normalize=True) return embedding[0] if embedding.ndim > 1 else embedding def _extract_text_from_dict(self, data: Dict) -> str: """Extract meaningful text from dict for embedding.""" # Priority fields for embedding fields = ['description', 'preference', 'pattern_description', 'content', 'message'] parts = [] for field in fields: if field in data and data[field]: parts.append(str(data[field])) return " | ".join(parts) if parts else str(data) def _serialize_value(self, value: Any) -> str: """Serialize value for storage.""" import json return json.dumps(value, default=str) def _deserialize_value(self, value_str: str) -> Any: """Deserialize value from storage.""" import json try: return json.loads(value_str) except: return value_str async def get_stats(self) -> Dict[str, Any]: """Get AgentDB statistics.""" return { 'total_vectors': await self.db.count(), 'dimension': self.dimension, 'mode': 'disk', 'hnsw_enabled': True } class AgentDBSubprocessWrapper: """ Wrapper for AgentDB when native bindings unavailable. Uses subprocess to call npx agentdb CLI commands. This is a fallback - native bindings preferred for performance. """ def __init__(self, db_path: Path, dimension: int, max_elements: int): self.db_path = db_path self.dimension = dimension # Implementation details for subprocess wrapper... pass # Implement same interface as native AgentDB... ``` ### 2.3 SQLite Adapter (Enhanced) ```python # src/mcp_standards/memory/sqlite_adapter.py import sqlite3 import json from typing import Any, Dict, List, Optional from pathlib import Path from datetime import datetime class SQLiteAdapter: """ Enhanced SQLite adapter for audit logs, metadata, and temporal tracking. Responsibilities: - Audit trail (compliance, security) - Full pattern history (metadata) - Temporal knowledge graph - Pattern frequency tracking """ def __init__(self, db_path: Path): self.db_path = db_path self._ensure_schema() def _ensure_schema(self): """Ensure all tables exist (v1 + v2 tables).""" with sqlite3.connect(self.db_path) as conn: # Existing v1 tables (keep for backward compatibility) conn.execute(""" CREATE TABLE IF NOT EXISTS pattern_frequency ( id INTEGER PRIMARY KEY, pattern_key TEXT NOT NULL UNIQUE, tool_name TEXT NOT NULL, pattern_type TEXT, pattern_description TEXT, occurrence_count INTEGER DEFAULT 1, first_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP, last_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP, promoted_to_preference BOOLEAN DEFAULT FALSE, confidence REAL DEFAULT 0.0, examples TEXT, agentdb_synced BOOLEAN DEFAULT FALSE -- v2: sync flag ) """) # v2: Reasoning bank for outcome tracking conn.execute(""" CREATE TABLE IF NOT EXISTS reasoning_episodes ( id INTEGER PRIMARY KEY, pattern_id TEXT NOT NULL, context TEXT, action_taken TEXT, outcome TEXT, -- 'success', 'failure', 'partial' confidence_before REAL, confidence_after REAL, timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, metadata TEXT, FOREIGN KEY (pattern_id) REFERENCES pattern_frequency(pattern_key) ) """) # v2: Sync metadata (track AgentDB sync status) conn.execute(""" CREATE TABLE IF NOT EXISTS sync_metadata ( id INTEGER PRIMARY KEY, table_name TEXT NOT NULL, record_key TEXT NOT NULL, last_synced TIMESTAMP DEFAULT CURRENT_TIMESTAMP, sync_status TEXT, -- 'pending', 'synced', 'failed' error_message TEXT, UNIQUE(table_name, record_key) ) """) conn.commit() async def store_metadata( self, pattern_key: str, pattern_data: Dict[str, Any], metadata: Optional[Dict] = None, confidence: float = 0.1 ) -> bool: """Store pattern metadata (complements AgentDB vector).""" try: with sqlite3.connect(self.db_path) as conn: conn.execute(""" INSERT OR REPLACE INTO pattern_frequency ( pattern_key, tool_name, pattern_type, pattern_description, confidence, examples, agentdb_synced ) VALUES (?, ?, ?, ?, ?, ?, TRUE) """, ( pattern_key, pattern_data.get('tool_name', ''), pattern_data.get('type', ''), pattern_data.get('description', ''), confidence, json.dumps(metadata or {}), True # Mark as synced to AgentDB )) conn.commit() return True except Exception as e: logger.error(f"SQLite metadata store failed: {e}") return False async def retrieve( self, key: str, namespace: Optional[str] = None, filters: Optional[Dict] = None ) -> List[Dict[str, Any]]: """Retrieve patterns by exact key match.""" # Implementation similar to existing pattern_extractor... pass async def query_audit_log( self, filters: Optional[Dict] = None, limit: int = 100 ) -> List[Dict[str, Any]]: """Query audit log for compliance/security.""" # Implementation using existing audit_log table... pass async def query_temporal_graph( self, pattern_key: str, filters: Optional[Dict] = None ) -> List[Dict[str, Any]]: """Query temporal knowledge graph (existing v1 feature).""" # Delegate to existing temporal_graph.py... pass async def get_stats(self) -> Dict[str, Any]: """Get SQLite statistics.""" with sqlite3.connect(self.db_path) as conn: cursor = conn.execute(""" SELECT COUNT(*) as total_patterns, SUM(CASE WHEN agentdb_synced THEN 1 ELSE 0 END) as synced, AVG(confidence) as avg_confidence FROM pattern_frequency """) row = cursor.fetchone() return { 'total_patterns': row[0], 'agentdb_synced': row[1], 'avg_confidence': row[2] } ``` --- ## 3. Component Integration ### 3.1 Pattern Extractor Integration (Modified) The existing `pattern_extractor.py` will be modified to store embeddings in AgentDB while maintaining SQLite for metadata. ```python # src/mcp_standards/hooks/pattern_extractor.py (modified) class PatternExtractor: """ Enhanced pattern extractor with semantic embeddings. v1: Stored patterns only in SQLite v2: Stores vectors in AgentDB + metadata in SQLite """ def __init__(self, db_path: Path, memory_router: Optional['MemoryRouter'] = None): self.db_path = db_path # v2: Use memory router instead of direct SQLite if memory_router: self.memory = memory_router else: # Fallback: Initialize router automatically from ..memory.router import MemoryRouter self.memory = MemoryRouter( agentdb_path=db_path.parent / "agentdb", sqlite_path=db_path ) # Keep existing rate limiting self._pattern_timestamps = [] def extract_patterns( self, tool_name: str, args: Dict[str, Any], result: Any, project_path: str = "" ) -> List[Dict[str, Any]]: """ Extract patterns with semantic embeddings (v2 enhancement). Changes from v1: - Now stores in both AgentDB (vectors) and SQLite (metadata) - Semantic clustering reduces promotion threshold from 3 → 2 """ # Rate limiting check (unchanged from v1) if not self._check_rate_limit(): return [] patterns = [] # 1. Detect patterns (unchanged logic from v1) correction_patterns = self._detect_corrections(tool_name, args, result) patterns.extend(correction_patterns) workflow_patterns = self._detect_workflow_patterns(tool_name, args, project_path) patterns.extend(workflow_patterns) tool_prefs = self._detect_tool_preferences(tool_name, args, result) patterns.extend(tool_prefs) # 2. Store patterns in hybrid memory (v2 change) for pattern in patterns: asyncio.create_task( self._store_pattern_hybrid(pattern, project_path) ) # 3. Check semantic clustering for early promotion (v2 change) asyncio.create_task( self._check_semantic_promotion(patterns) ) return patterns async def _store_pattern_hybrid( self, pattern: Dict[str, Any], project_path: str ) -> None: """ Store pattern in both AgentDB and SQLite. v2 enhancement: Hybrid storage for semantic + metadata """ pattern_key = pattern["pattern_key"] await self.memory.store_pattern( pattern_key=pattern_key, pattern_data=pattern, metadata={ "project_path": project_path, "detected_at": datetime.now().isoformat() }, confidence=pattern.get("confidence", 0.1) ) async def _check_semantic_promotion( self, new_patterns: List[Dict[str, Any]] ) -> None: """ v2 enhancement: Semantic clustering for early promotion. Instead of requiring 3 exact matches, we now: 1. Find semantically similar patterns (>0.85 similarity) 2. Promote if total cluster size >= 2 (reduced from 3) """ SIMILARITY_THRESHOLD = 0.85 PROMOTION_CLUSTER_SIZE = 2 # Reduced from 3 for pattern in new_patterns: # Find semantically similar patterns similar = await self.memory.search_patterns( query=pattern['description'], query_type=QueryType.SEMANTIC, top_k=10, min_confidence=SIMILARITY_THRESHOLD ) # Check cluster size if len(similar) >= PROMOTION_CLUSTER_SIZE: await self._promote_pattern_cluster(pattern, similar) async def _promote_pattern_cluster( self, pattern: Dict[str, Any], similar_patterns: List[Dict] ) -> None: """ Promote a cluster of semantically similar patterns. v2: Combines similar patterns into a single preference """ # Calculate cluster confidence (average of similar patterns) cluster_confidence = sum(p['similarity'] for p in similar_patterns) / len(similar_patterns) # Create preference entry (existing v1 logic) category = pattern.get('category', 'general') # Store in SQLite tool_preferences with sqlite3.connect(self.db_path) as conn: conn.execute(""" INSERT INTO tool_preferences ( category, context, preference, confidence, examples, learned_from, created_at ) VALUES (?, ?, ?, ?, ?, ?, ?) """, ( category, f"Learned from {len(similar_patterns)} similar patterns (semantic)", pattern['description'], cluster_confidence, json.dumps([p['key'] for p in similar_patterns]), f"semantic_cluster:{pattern['pattern_key']}", datetime.now().isoformat() )) conn.commit() ``` ### 3.2 CLAUDE.md Manager Integration (Event-Driven) ```python # src/mcp_standards/intelligence/claudemd_manager.py (enhanced) import asyncio from pathlib import Path from typing import Optional from ..events.event_bus import EventBus, Event from ..memory.router import MemoryRouter, QueryType class ClaudeMdManager: """ Enhanced CLAUDE.md manager with event-driven updates. v1: Manual trigger via update_claudemd() MCP tool v2: Automatic updates on pattern promotion events """ def __init__( self, db_path: Path, memory_router: MemoryRouter, event_bus: EventBus ): self.db_path = db_path self.memory = memory_router self.events = event_bus # Subscribe to pattern promotion events self.events.subscribe("pattern_promoted", self._on_pattern_promoted) self.events.subscribe("config_changed", self._on_config_changed) async def _on_pattern_promoted(self, event: Event): """ Handle pattern promotion event (v2 enhancement). Automatically updates CLAUDE.md when patterns are promoted. """ pattern_data = event.data # Determine which CLAUDE.md to update project_path = pattern_data.get('project_path') if project_path: # Update project-specific CLAUDE.md claudemd_path = Path(project_path) / "CLAUDE.md" else: # Update global CLAUDE.md claudemd_path = Path.home() / ".claude" / "CLAUDE.md" # Generate updated content using semantic search updated_content = await self._generate_content_semantic( project_path=project_path, min_confidence=0.7 ) # Update file with smart merge (existing v1 logic) success, message = await self.update_claudemd_file( file_path=claudemd_path, content=updated_content ) # Emit notification event if success: await self.events.emit(Event( type="claudemd_updated", data={ "file_path": str(claudemd_path), "auto_updated": True, "trigger": "pattern_promotion" } )) async def _generate_content_semantic( self, project_path: Optional[str] = None, min_confidence: float = 0.7 ) -> str: """ Generate CLAUDE.md content using semantic search (v2 enhancement). v1: Queried SQLite directly v2: Uses AgentDB semantic search for better grouping """ # Search for high-confidence patterns semantically patterns = await self.memory.search_patterns( query="learned preferences and workflow patterns", query_type=QueryType.HYBRID, # Use both AgentDB + SQLite top_k=50, min_confidence=min_confidence ) # Group patterns by semantic similarity pattern_clusters = self._cluster_patterns_semantically(patterns) # Generate markdown sections sections = self._format_pattern_clusters(pattern_clusters) return self._build_markdown_template(sections, project_path) def _cluster_patterns_semantically( self, patterns: List[Dict] ) -> Dict[str, List[Dict]]: """ v2 enhancement: Cluster patterns by semantic similarity. Groups related patterns together for better CLAUDE.md organization. """ # Use simple similarity-based clustering clusters = {} for pattern in patterns: # Find best matching cluster best_cluster = None best_similarity = 0.0 for cluster_name, cluster_patterns in clusters.items(): # Calculate similarity to cluster centroid similarity = self._calculate_cluster_similarity( pattern, cluster_patterns ) if similarity > best_similarity: best_similarity = similarity best_cluster = cluster_name # Assign to cluster or create new one if best_cluster and best_similarity > 0.75: clusters[best_cluster].append(pattern) else: # Create new cluster cluster_name = pattern.get('category', 'general') if cluster_name not in clusters: clusters[cluster_name] = [] clusters[cluster_name].append(pattern) return clusters ``` ### 3.3 Event Bus Implementation ```python # src/mcp_standards/events/event_bus.py import asyncio from typing import Callable, Dict, List, Optional from dataclasses import dataclass from datetime import datetime import logging logger = logging.getLogger(__name__) @dataclass class Event: """Event data structure.""" type: str data: Dict timestamp: datetime = None source: str = "mcp-standards" def __post_init__(self): if self.timestamp is None: self.timestamp = datetime.now() class EventBus: """ Event bus for decoupled component communication. Events: - pattern_promoted: When pattern reaches promotion threshold - config_changed: When config files (.editorconfig, etc) change - claudemd_updated: When CLAUDE.md is auto-updated - reasoning_outcome: When ReasoningBank records outcome """ def __init__(self): self._subscribers: Dict[str, List[Callable]] = {} self._event_queue = asyncio.Queue() self._running = False def subscribe(self, event_type: str, callback: Callable): """Subscribe to event type.""" if event_type not in self._subscribers: self._subscribers[event_type] = [] self._subscribers[event_type].append(callback) logger.info(f"Subscribed to event: {event_type}") async def emit(self, event: Event): """Emit event to subscribers.""" await self._event_queue.put(event) async def start(self): """Start event processing loop.""" self._running = True while self._running: try: event = await asyncio.wait_for( self._event_queue.get(), timeout=1.0 ) await self._process_event(event) except asyncio.TimeoutError: continue except Exception as e: logger.error(f"Event processing error: {e}") async def _process_event(self, event: Event): """Process event by calling subscribers.""" if event.type in self._subscribers: # Call all subscribers in parallel tasks = [ asyncio.create_task(callback(event)) for callback in self._subscribers[event.type] ] await asyncio.gather(*tasks, return_exceptions=True) def stop(self): """Stop event processing.""" self._running = False ``` --- ## 4. Data Migration Strategy ### 4.1 Migration Plan: v1 SQLite → v2 Hybrid ```python # src/mcp_standards/migration/v2_migrator.py from pathlib import Path import sqlite3 import asyncio from typing import Dict, List import logging logger = logging.getLogger(__name__) class V2Migrator: """ Migrate v1 SQLite-only data to v2 hybrid AgentDB + SQLite. Migration Steps: 1. Backup v1 database 2. Create v2 schema (add new tables) 3. Migrate patterns to AgentDB (generate embeddings) 4. Update SQLite with sync metadata 5. Verify data integrity """ def __init__(self, v1_db_path: Path): self.v1_db_path = v1_db_path self.v2_db_path = v1_db_path.parent / "knowledge_v2.db" self.agentdb_path = v1_db_path.parent / "agentdb" async def migrate(self) -> bool: """ Execute migration from v1 to v2. Returns: Success status """ try: # Step 1: Backup v1 database await self._backup_v1_database() # Step 2: Create v2 schema await self._create_v2_schema() # Step 3: Migrate pattern data await self._migrate_patterns() # Step 4: Migrate tool preferences await self._migrate_preferences() # Step 5: Verify migration await self._verify_migration() logger.info("Migration completed successfully") return True except Exception as e: logger.error(f"Migration failed: {e}") await self._rollback() return False async def _backup_v1_database(self): """Create backup of v1 database.""" import shutil from datetime import datetime backup_path = self.v1_db_path.with_suffix( f".backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}.db" ) shutil.copy2(self.v1_db_path, backup_path) logger.info(f"Backup created: {backup_path}") async def _create_v2_schema(self): """Create v2 schema additions.""" with sqlite3.connect(self.v2_db_path) as conn: # Copy v1 schema v1_conn = sqlite3.connect(self.v1_db_path) for line in v1_conn.iterdump(): if line not in ('BEGIN;', 'COMMIT;'): conn.execute(line) v1_conn.close() # Add v2 tables conn.execute(""" CREATE TABLE IF NOT EXISTS reasoning_episodes ( id INTEGER PRIMARY KEY, pattern_id TEXT NOT NULL, context TEXT, action_taken TEXT, outcome TEXT, confidence_before REAL, confidence_after REAL, timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, metadata TEXT ) """) # Add sync metadata column to existing table conn.execute(""" ALTER TABLE pattern_frequency ADD COLUMN agentdb_synced BOOLEAN DEFAULT FALSE """) conn.commit() async def _migrate_patterns(self): """ Migrate patterns to AgentDB with embeddings. This is the core migration step - generates embeddings for all existing patterns and stores in AgentDB. """ from ..memory.router import MemoryRouter # Initialize memory router router = MemoryRouter( agentdb_path=self.agentdb_path, sqlite_path=self.v2_db_path ) # Read patterns from v1 database with sqlite3.connect(self.v1_db_path) as conn: conn.row_factory = sqlite3.Row cursor = conn.execute(""" SELECT * FROM pattern_frequency """) patterns = [dict(row) for row in cursor.fetchall()] logger.info(f"Migrating {len(patterns)} patterns to AgentDB...") # Migrate patterns in batches BATCH_SIZE = 100 migrated_count = 0 for i in range(0, len(patterns), BATCH_SIZE): batch = patterns[i:i + BATCH_SIZE] # Store batch in parallel tasks = [ router.store_pattern( pattern_key=p['pattern_key'], pattern_data={ 'type': p['pattern_type'], 'description': p['pattern_description'], 'tool_name': p['tool_name'] }, metadata={ 'occurrence_count': p['occurrence_count'], 'first_seen': p['first_seen'], 'last_seen': p['last_seen'] }, confidence=p['confidence'] ) for p in batch ] results = await asyncio.gather(*tasks) migrated_count += sum(results) # Update progress logger.info(f"Migrated {migrated_count}/{len(patterns)} patterns") logger.info(f"Pattern migration complete: {migrated_count} patterns") async def _verify_migration(self): """Verify migration data integrity.""" # Count patterns in v1 vs v2 v1_count = self._count_patterns(self.v1_db_path) v2_count = self._count_patterns(self.v2_db_path) if v1_count != v2_count: raise Exception( f"Migration verification failed: " f"v1={v1_count}, v2={v2_count}" ) logger.info(f"Verification passed: {v2_count} patterns migrated") def _count_patterns(self, db_path: Path) -> int: """Count patterns in database.""" with sqlite3.connect(db_path) as conn: cursor = conn.execute( "SELECT COUNT(*) FROM pattern_frequency" ) return cursor.fetchone()[0] ``` ### 4.2 Migration CLI ```python # src/mcp_standards/cli/migrate.py import asyncio import sys from pathlib import Path import click @click.command() @click.option('--db-path', type=click.Path(exists=True), help='Path to v1 database (default: ~/.mcp-standards/knowledge.db)') @click.option('--dry-run', is_flag=True, help='Simulate migration without applying changes') def migrate_v2(db_path: str, dry_run: bool): """Migrate mcp-standards v1 database to v2 hybrid architecture.""" if not db_path: db_path = Path.home() / ".mcp-standards" / "knowledge.db" else: db_path = Path(db_path) if not db_path.exists(): click.echo(f"Error: Database not found: {db_path}", err=True) sys.exit(1) click.echo(f"Migrating database: {db_path}") click.echo(f"Dry run: {dry_run}") from ..migration.v2_migrator import V2Migrator migrator = V2Migrator(v1_db_path=db_path) if dry_run: click.echo("Dry run - no changes will be made") # TODO: Implement dry run analysis else: # Run migration success = asyncio.run(migrator.migrate()) if success: click.echo("✅ Migration completed successfully!") else: click.echo("❌ Migration failed. Check logs for details.", err=True) sys.exit(1) if __name__ == '__main__': migrate_v2() ``` --- ## 5. API Design ### 5.1 New MCP Tools (v2 Additions) ```python # src/mcp_standards/server.py (additions) # Add to list_tools(): Tool( name="semantic_search_patterns", description="Search patterns using semantic similarity (AgentDB vector search)", inputSchema={ "type": "object", "properties": { "query": { "type": "string", "description": "Natural language search query" }, "top_k": { "type": "integer", "description": "Number of results", "default": 5 }, "min_confidence": { "type": "number", "description": "Minimum confidence threshold", "default": 0.7 }, "namespace": { "type": "string", "description": "Filter by namespace (optional)" } }, "required": ["query"] } ), Tool( name="cluster_related_patterns", description="Find semantically related patterns and cluster them", inputSchema={ "type": "object", "properties": { "pattern_key": { "type": "string", "description": "Pattern to find related items for" }, "similarity_threshold": { "type": "number", "description": "Minimum similarity score", "default": 0.75 } }, "required": ["pattern_key"] } ), Tool( name="record_reasoning_outcome", description="Record outcome of applying a pattern (for ReasoningBank learning)", inputSchema={ "type": "object", "properties": { "pattern_id": { "type": "string", "description": "Pattern that was applied" }, "outcome": { "type": "string", "enum": ["success", "failure", "partial"], "description": "Outcome of applying pattern" }, "context": { "type": "object", "description": "Context in which pattern was applied" } }, "required": ["pattern_id", "outcome"] } ), Tool( name="get_pattern_success_rate", description="Get success rate statistics for a pattern (from ReasoningBank)", inputSchema={ "type": "object", "properties": { "pattern_id": { "type": "string", "description": "Pattern to get stats for" } }, "required": ["pattern_id"] } ), Tool( name="predict_next_correction", description="Predict likely next correction based on current context (experimental)", inputSchema={ "type": "object", "properties": { "context": { "type": "string", "description": "Current context or action" }, "top_k": { "type": "integer", "description": "Number of predictions", "default": 3 } }, "required": ["context"] } ) ``` ### 5.2 MCP Tool Handlers ```python # src/mcp_standards/server.py (handler implementations) async def _semantic_search_patterns( self, query: str, top_k: int = 5, min_confidence: float = 0.7, namespace: Optional[str] = None ) -> Dict[str, Any]: """ Semantic search using AgentDB vector similarity. Example: query: "prefer uv package manager" Returns: ["use uv not pip", "always use uv", ...] """ results = await self.memory_router.search_patterns( query=query, query_type=QueryType.SEMANTIC, top_k=top_k, namespace=namespace, min_confidence=min_confidence ) return { "success": True, "query": query, "results": results, "count": len(results) } async def _cluster_related_patterns( self, pattern_key: str, similarity_threshold: float = 0.75 ) -> Dict[str, Any]: """ Find and cluster semantically related patterns. Uses AgentDB to find similar patterns and groups them. """ # Get pattern description pattern = await self.memory_router.search_patterns( query=pattern_key, query_type=QueryType.EXACT, top_k=1 ) if not pattern: return {"success": False, "error": "Pattern not found"} # Find similar patterns similar = await self.memory_router.search_patterns( query=pattern[0]['value']['description'], query_type=QueryType.SEMANTIC, top_k=20, min_confidence=similarity_threshold ) return { "success": True, "pattern_key": pattern_key, "cluster_size": len(similar), "related_patterns": similar } async def _record_reasoning_outcome( self, pattern_id: str, outcome: str, context: Dict[str, Any] ) -> Dict[str, Any]: """ Record outcome for ReasoningBank learning. Tracks whether applying a pattern succeeded or failed. """ # Store outcome in SQLite reasoning_episodes table with sqlite3.connect(self.db_path) as conn: # Get current confidence cursor = conn.execute(""" SELECT confidence FROM pattern_frequency WHERE pattern_key = ? """, (pattern_id,)) row = cursor.fetchone() confidence_before = row[0] if row else 0.0 # Calculate new confidence based on outcome confidence_after = self._adjust_confidence( confidence_before, outcome ) # Record episode conn.execute(""" INSERT INTO reasoning_episodes ( pattern_id, context, action_taken, outcome, confidence_before, confidence_after ) VALUES (?, ?, ?, ?, ?, ?) """, ( pattern_id, json.dumps(context), "pattern_applied", outcome, confidence_before, confidence_after )) # Update pattern confidence conn.execute(""" UPDATE pattern_frequency SET confidence = ? WHERE pattern_key = ? """, (confidence_after, pattern_id)) conn.commit() # Emit event await self.event_bus.emit(Event( type="reasoning_outcome", data={ "pattern_id": pattern_id, "outcome": outcome, "confidence_change": confidence_after - confidence_before } )) return { "success": True, "pattern_id": pattern_id, "outcome": outcome, "confidence_before": confidence_before, "confidence_after": confidence_after } def _adjust_confidence( self, current_confidence: float, outcome: str ) -> float: """ Bayesian confidence adjustment based on outcome. success: +0.1 (up to max 1.0) failure: -0.15 (down to min 0.0) partial: +0.05 """ if outcome == "success": return min(1.0, current_confidence + 0.1) elif outcome == "failure": return max(0.0, current_confidence - 0.15) elif outcome == "partial": return min(1.0, current_confidence + 0.05) else: return current_confidence ``` --- ## 6. Performance Optimization ### 6.1 AgentDB Configuration for <10ms Startup ```python # src/mcp_standards/config/performance.py class AgentDBPerformanceConfig: """ Optimized AgentDB configuration for <10ms startup. Based on AgentDB benchmarks: - Memory mode: ~100ms startup (browser) - Disk mode: <10ms startup (Node.js/native) - HNSW graph: 116x faster than naive search """ # HNSW Graph Parameters HNSW_M = 16 # Connections per element (trade-off: speed vs accuracy) HNSW_EF_CONSTRUCTION = 200 # Build-time parameter (higher = better quality) HNSW_EF_SEARCH = 50 # Search-time parameter (higher = better recall) # Memory Configuration MODE = "disk" # 'disk' for <10ms startup, 'memory' for pure speed MAX_ELEMENTS = 100_000 # Maximum vectors to store # Embedding Configuration EMBEDDING_MODEL = "all-MiniLM-L6-v2" # 384 dimensions, fast inference EMBEDDING_DIMENSION = 384 EMBEDDING_BATCH_SIZE = 32 # Batch size for encoding # Search Configuration DEFAULT_TOP_K = 5 # Default number of results MIN_SIMILARITY = 0.7 # Minimum similarity threshold @classmethod def for_startup_speed(cls): """Optimize for <10ms startup (disk mode, small graph).""" return { 'mode': 'disk', 'max_elements': 50_000, 'hnsw_m': 12, # Smaller graph = faster load 'hnsw_ef_construction': 150 } @classmethod def for_search_speed(cls): """Optimize for <1ms search (memory mode, optimized graph).""" return { 'mode': 'memory', 'max_elements': 100_000, 'hnsw_m': 16, 'hnsw_ef_construction': 200, 'hnsw_ef_search': 50 } @classmethod def for_accuracy(cls): """Optimize for search accuracy (larger graph, higher ef).""" return { 'mode': 'memory', 'max_elements': 100_000, 'hnsw_m': 32, # More connections = better accuracy 'hnsw_ef_construction': 400, 'hnsw_ef_search': 100 } ``` ### 6.2 Benchmarking Suite ```python # src/mcp_standards/benchmarks/performance.py import asyncio import time from pathlib import Path import logging logger = logging.getLogger(__name__) class PerformanceBenchmark: """ Benchmark suite for v2 performance targets. Targets: - Startup: <10ms - Search: <1ms - Pattern extraction: <5ms - CLAUDE.md generation: <100ms """ def __init__(self, memory_router): self.memory = memory_router async def benchmark_startup(self) -> Dict[str, float]: """ Benchmark startup time. Target: <10ms for AgentDB """ start = time.perf_counter() # Initialize fresh AgentDB instance from ..memory.agentdb_adapter import AgentDBAdapter adapter = AgentDBAdapter( db_path=Path("/tmp/test_agentdb"), embedding_model="all-MiniLM-L6-v2" ) end = time.perf_counter() startup_time_ms = (end - start) * 1000 logger.info(f"Startup time: {startup_time_ms:.2f}ms") return { 'startup_time_ms': startup_time_ms, 'target_ms': 10, 'passed': startup_time_ms < 10 } async def benchmark_search(self, num_queries: int = 100) -> Dict[str, Any]: """ Benchmark search performance. Target: <1ms average search time """ # Load test data await self._load_test_patterns(count=10_000) # Test queries queries = [ "use uv not pip", "run tests after code changes", "update docs when adding features", # ... more test queries ] times = [] for _ in range(num_queries): query = queries[_ % len(queries)] start = time.perf_counter() results = await self.memory.search_patterns( query=query, query_type=QueryType.SEMANTIC, top_k=5 ) end = time.perf_counter() times.append((end - start) * 1000) # Convert to ms avg_time = sum(times) / len(times) p95_time = sorted(times)[int(len(times) * 0.95)] p99_time = sorted(times)[int(len(times) * 0.99)] logger.info(f"Search - Avg: {avg_time:.2f}ms, P95: {p95_time:.2f}ms, P99: {p99_time:.2f}ms") return { 'avg_search_time_ms': avg_time, 'p95_search_time_ms': p95_time, 'p99_search_time_ms': p99_time, 'target_ms': 1, 'passed': avg_time < 1 } async def benchmark_pattern_extraction(self) -> Dict[str, Any]: """ Benchmark pattern extraction with embedding generation. Target: <5ms per pattern """ from ..hooks.pattern_extractor import PatternExtractor extractor = PatternExtractor( db_path=Path("/tmp/test.db"), memory_router=self.memory ) # Test pattern extraction test_cases = [ { 'tool_name': 'Bash', 'args': {'command': 'uv pip install pytest'}, 'result': 'success' }, # ... more test cases ] times = [] for case in test_cases: start = time.perf_counter() patterns = extractor.extract_patterns( tool_name=case['tool_name'], args=case['args'], result=case['result'] ) end = time.perf_counter() times.append((end - start) * 1000) avg_time = sum(times) / len(times) logger.info(f"Pattern extraction: {avg_time:.2f}ms") return { 'avg_extraction_time_ms': avg_time, 'target_ms': 5, 'passed': avg_time < 5 } async def run_all_benchmarks(self) -> Dict[str, Any]: """Run complete benchmark suite.""" results = { 'startup': await self.benchmark_startup(), 'search': await self.benchmark_search(), 'pattern_extraction': await self.benchmark_pattern_extraction() } # Overall pass/fail all_passed = all( result['passed'] for result in results.values() ) results['overall_passed'] = all_passed return results ``` --- ## 7. Implementation Phases ### Phase 1: AgentDB Core Integration (Weeks 1-2) **Goal**: Replace SQLite FTS5 with AgentDB HNSW for semantic search. **Tasks**: 1. ✅ Install AgentDB dependency ```bash npm install agentdb # or for Python native bindings (if available) pip install agentdb-python ``` 2. ✅ Create `agentdb_adapter.py` - HNSW configuration - Embedding generation - Vector search interface 3. ✅ Create `memory_router.py` - Query routing logic - Hybrid query support - Result merging 4. ✅ Create `sqlite_adapter.py` - Metadata storage - Audit logging - Temporal queries 5. ✅ Update `pattern_extractor.py` - Use memory router - Store vectors in AgentDB - Semantic clustering logic 6. ✅ Testing & Benchmarking - Load 10K test patterns - Measure search latency (<1ms target) - Test semantic matching accuracy **Deliverables**: - Working AgentDB integration - <10ms startup time - <1ms search performance - Semantic pattern matching **Success Criteria**: - [ ] AgentDB adapter passes unit tests - [ ] Search latency <1ms (P95) - [ ] Startup time <10ms - [ ] Semantic matching: "use uv" matches "prefer uv package manager" --- ### Phase 2: Event-Driven CLAUDE.md (Week 3) **Goal**: Automatic CLAUDE.md updates with zero manual triggers. **Tasks**: 1. ✅ Create `event_bus.py` - Event subscription/emission - Async event processing - Event types definition 2. ✅ Create `config_watcher.py` - File system monitoring (inotify/FSEvents) - Config file change detection - Event emission on changes 3. ✅ Create `proactive_suggester.py` - Background monitoring job - Pattern promotion detection - MCP notification integration 4. ✅ Update `claudemd_manager.py` - Event subscription - Auto-update on pattern promotion - Semantic content generation 5. ✅ Create `diff_learner.py` - CLAUDE.md backup analysis - Extract user-added preferences - Learn from manual edits **Deliverables**: - Event-driven architecture - Automatic CLAUDE.md updates - Proactive suggestion notifications - Learning from manual edits **Success Criteria**: - [ ] CLAUDE.md auto-updates on pattern promotion - [ ] File watcher detects config changes - [ ] MCP notifications sent for suggestions - [ ] Diff learner extracts manual preferences --- ### Phase 3: ReasoningBank Outcomes (Week 4) **Goal**: Track pattern outcomes and adjust confidence automatically. **Tasks**: 1. ✅ Create `reasoning_bank.py` - Outcome recording - Success rate calculation - Confidence adjustment (Bayesian) 2. ✅ Add `reasoning_episodes` table - Schema definition - Indexes for performance 3. ✅ Integrate with pattern application flow - Auto-detect outcomes - Record successes/failures - Adjust confidence scores 4. ✅ Create MCP tools - `record_reasoning_outcome` - `get_pattern_success_rate` **Deliverables**: - ReasoningBank implementation - Automatic confidence adjustment - Outcome tracking API **Success Criteria**: - [ ] Outcomes recorded correctly - [ ] Confidence adjusts based on success rate - [ ] Failed patterns demoted automatically - [ ] Successful patterns boosted --- ### Phase 4: Testing & Documentation (Week 5) **Goal**: Production-ready release with comprehensive tests and docs. **Tasks**: 1. ✅ Re-enable test suite - Migrate tests from `tests/_disabled/` - Update for v2 architecture - Add AgentDB integration tests 2. ✅ Create migration tool - CLI for v1 → v2 migration - Data verification - Rollback support 3. ✅ Update documentation - ARCHITECTURE.md - API documentation - Migration guide - Performance tuning guide 4. ✅ Performance testing - Benchmark suite execution - Optimization tuning - Load testing **Deliverables**: - >80% test coverage - Migration tool - Complete documentation - Performance benchmarks **Success Criteria**: - [ ] All tests passing - [ ] Migration tool verified - [ ] Documentation complete - [ ] Performance targets met --- ## 8. Testing Strategy ### 8.1 Unit Tests ```python # tests/unit/test_memory_router.py import pytest from pathlib import Path from mcp_standards.memory.router import MemoryRouter, QueryType @pytest.mark.asyncio async def test_semantic_search(): """Test semantic search routing to AgentDB.""" router = MemoryRouter( agentdb_path=Path("/tmp/test_agentdb"), sqlite_path=Path("/tmp/test.db") ) # Store test pattern await router.store_pattern( pattern_key="test_uv_preference", pattern_data={ 'description': "use uv instead of pip for Python packages", 'category': 'python-package' }, confidence=0.8 ) # Search with semantic query results = await router.search_patterns( query="prefer uv package manager", # Different wording query_type=QueryType.SEMANTIC, top_k=5 ) # Should find the pattern despite different wording assert len(results) > 0 assert results[0]['key'] == "test_uv_preference" assert results[0]['similarity'] > 0.7 @pytest.mark.asyncio async def test_hybrid_search(): """Test hybrid search merging AgentDB + SQLite.""" # Test implementation... pass ``` ### 8.2 Integration Tests ```python # tests/integration/test_pattern_learning.py import pytest from pathlib import Path from mcp_standards.hooks.pattern_extractor import PatternExtractor @pytest.mark.asyncio async def test_semantic_clustering_promotion(): """ Test that semantically similar patterns trigger early promotion. v2: Should promote after 2 similar patterns (not 3) """ extractor = PatternExtractor( db_path=Path("/tmp/test.db"), memory_router=router ) # Pattern 1: "use uv not pip" patterns1 = extractor.extract_patterns( tool_name="Bash", args={'command': 'uv pip install pytest'}, result='success' ) # Pattern 2: "prefer uv package manager" (semantically similar) patterns2 = extractor.extract_patterns( tool_name="Bash", args={'command': 'prefer uv package manager for python'}, result='success' ) # Check if promoted after just 2 similar patterns preferences = await extractor.get_learned_preferences( category="python-package", min_confidence=0.7 ) # v2: Should be promoted after 2 (not 3) assert len(preferences) > 0 assert "uv" in preferences[0]['preference'].lower() ``` ### 8.3 Performance Tests ```python # tests/performance/test_benchmarks.py import pytest from mcp_standards.benchmarks.performance import PerformanceBenchmark @pytest.mark.asyncio async def test_startup_performance(): """Test that startup meets <10ms target.""" benchmark = PerformanceBenchmark(router) result = await benchmark.benchmark_startup() assert result['passed'], \ f"Startup time {result['startup_time_ms']:.2f}ms exceeds target {result['target_ms']}ms" @pytest.mark.asyncio async def test_search_performance(): """Test that search meets <1ms target.""" benchmark = PerformanceBenchmark(router) result = await benchmark.benchmark_search(num_queries=1000) assert result['passed'], \ f"Avg search time {result['avg_search_time_ms']:.2f}ms exceeds target {result['target_ms']}ms" ``` --- ## 9. Appendix ### 9.1 Class Diagram ``` ┌─────────────────────────────────────────────────────────────┐ │ MCP Server │ │ (ClaudeMemoryMCP) │ └───────────────────────┬─────────────────────────────────────┘ │ ┌────────────┴────────────┐ │ │ ┌──────▼──────┐ ┌──────▼──────┐ │ Memory │ │ Event Bus │ │ Router │ │ │ └──┬──────┬───┘ └──────┬──────┘ │ │ │ │ │ ┌──────┴──────┐ │ │ │ │ ┌───▼──┐ ┌▼────┐ ┌────▼────┐ ┌────▼────┐ │Agent │ │SQLite│ │CLAUDE.md│ │Config │ │ DB │ │ │ │Manager │ │Watcher │ │Adapter│ │Adapter│ └─────────┘ └─────────┘ └──────┘ └──────┘ │ │ ┌───▼───┐ ┌─▼────┐ │HNSW │ │Audit │ │Graph │ │Log │ └───────┘ └──────┘ ``` ### 9.2 Dependencies Update ```toml # pyproject.toml (v2 additions) [project] name = "mcp-standards" version = "2.0.0" dependencies = [ "mcp>=1.0.0", "sentence-transformers>=2.2.0", # Existing "faiss-cpu>=1.7.4", # Existing "numpy>=1.24.0", # Existing "networkx>=3.1", # Existing # v2 additions: "watchdog>=3.0.0", # File system monitoring "agentdb-python>=0.1.0", # AgentDB bindings (if available) ] ``` ### 9.3 Configuration File ```yaml # .mcp-standards/config.yaml (v2 config) version: "2.0" memory: agentdb: enabled: true path: "${HOME}/.mcp-standards/agentdb" mode: "disk" # 'disk' for <10ms startup, 'memory' for speed max_elements: 100000 hnsw: m: 16 ef_construction: 200 ef_search: 50 sqlite: path: "${HOME}/.mcp-standards/knowledge.db" wal_mode: true embedding: model: "all-MiniLM-L6-v2" dimension: 384 batch_size: 32 intelligence: pattern_extractor: semantic_clustering: true promotion_threshold: 2 # Reduced from 3 similarity_threshold: 0.85 reasoning_bank: enabled: true confidence_adjustment: success: 0.1 failure: -0.15 partial: 0.05 events: file_watcher: enabled: true watch_patterns: - ".editorconfig" - ".prettierrc*" - "pyproject.toml" - "package.json" proactive_suggestions: enabled: true check_interval_seconds: 300 # 5 minutes notification_threshold: 3 # patterns ready performance: startup_target_ms: 10 search_target_ms: 1 benchmarks: enabled: true report_path: "${HOME}/.mcp-standards/benchmarks" ``` --- ## 10. Conclusion This specification provides a complete technical architecture for integrating AgentDB into mcp-standards v2.0. The hybrid architecture (AgentDB hot path + SQLite cold path) delivers: - **50-100x performance improvement** via HNSW vector search - **Semantic pattern matching** reducing corrections from 3-5 to 1-2 - **Event-driven automation** eliminating manual CLAUDE.md updates - **ReasoningBank outcomes** enabling self-adjusting confidence - **Backward compatibility** with v1 data via migration tool **Development can begin immediately** using this specification as the blueprint. All components, APIs, and integration points are defined with concrete code examples and performance targets. **Next Steps**: 1. Create git feature branch: `feature/v2-agentdb-integration` 2. Begin Phase 1 implementation (AgentDB core) 3. Set up CI/CD for performance benchmarking 4. Create project tracking board with tasks from Phase 1-4 --- **Document Control**: - Version: 1.0 - Last Updated: 2025-10-20 - Review Status: Ready for Implementation - Approvals: Architecture Team

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/airmcp-com/mcp-standards'

If you have feedback or need assistance with the MCP directory API, please join our Discord server