Skip to main content
Glama
v2-implementation-gaps.md54.2 kB
# V2 Implementation Gaps Analysis **System Architecture Designer Report** **Date**: 2025-10-20 **Purpose**: Code-level gap analysis for V2 architecture planning --- ## Executive Summary This analysis identifies specific implementation gaps between the current mcp-standards codebase and V2 requirements for AgentDB/ReasoningBank integration. Each gap includes concrete code-level recommendations referencing existing files. **Critical Gaps Identified**: 1. **Performance**: SQLite FTS5 (50ms) vs AgentDB HNSW target (<1ms) - 50x gap 2. **Intelligence**: Regex-only pattern matching vs semantic embedding clustering 3. **Architecture**: Single-tier memory vs hybrid AgentDB+SQLite design 4. **Integration**: No vector embedding pipeline or HNSW graph infrastructure 5. **UX**: Manual triggers vs event-driven automation --- ## 1. Performance Gaps ### 1.1 Search Performance: SQLite FTS5 Bottleneck **Current Implementation**: `src/intelligence/memory/persistence.py` ```python # Line 435-487: Fallback text search (keyword matching only) async def _text_search(self, query: str, top_k: int, namespace: Optional[str], include_metadata: bool) -> List[Dict]: """Fallback text-based search when FAISS is not available.""" query_lower = query.lower() # Simple substring matching in pickled values if query_lower in value_text or query_lower in row[0].lower(): # 50ms+ per query on 10K+ records ``` **Problems**: - Linear scan through all memory entries (O(n) complexity) - Pickle deserialization overhead for every row - No semantic understanding: "use uv" ≠ "prefer uv package manager" - SQLite FTS5 not utilized in current implementation - FAISS index exists but not integrated with pattern_extractor.py **Gap**: AgentDB HNSW target is <1ms with 116x speedup for vector search **Code-Level Recommendations**: ```python # NEW FILE: src/intelligence/memory/agentdb_adapter.py from typing import List, Dict, Any, Optional import numpy as np class AgentDBAdapter: """ AgentDB HNSW integration for ultra-fast semantic pattern search. Replaces SQLite FTS5 for hot-path queries. """ def __init__(self, dimension: int = 384, # all-MiniLM-L6-v2 max_elements: int = 100_000, ef_construction: int = 200, M: int = 16): """ Initialize AgentDB with HNSW configuration. Performance targets: - Startup: <10ms (memory mode) or ~100ms (disk mode) - Search: <1ms per query - Insert: <2ms per pattern """ try: # AgentDB initialization (will be npm package) from agentdb import VectorStore, HNSWConfig self.config = HNSWConfig( dim=dimension, max_elements=max_elements, ef_construction=ef_construction, # Build quality M=M, # Graph connectivity metric='cosine' # For normalized embeddings ) # Create in-memory store (fast startup) self.store = VectorStore( path=":memory:", # Or disk path for persistence config=self.config ) self.dimension = dimension except ImportError: raise RuntimeError( "AgentDB not installed. Run: npm install agentdb" ) async def add_pattern(self, pattern_id: str, pattern_text: str, embedding: np.ndarray, metadata: Dict[str, Any]) -> bool: """ Add pattern to HNSW graph with metadata. Target: <2ms per insertion """ try: # Store in AgentDB with metadata self.store.add( vector=embedding.tolist(), metadata={ 'pattern_id': pattern_id, 'pattern_text': pattern_text, **metadata } ) return True except Exception as e: logger.error(f"AgentDB insertion failed: {e}") return False async def search_similar(self, query_embedding: np.ndarray, top_k: int = 5, threshold: float = 0.7) -> List[Dict]: """ Semantic search using HNSW graph. Target: <1ms per query (116x faster than naive search) """ try: results = self.store.search( query=query_embedding.tolist(), k=top_k, threshold=threshold ) return [ { 'pattern_id': r['metadata']['pattern_id'], 'pattern_text': r['metadata']['pattern_text'], 'similarity': r['score'], 'metadata': r['metadata'] } for r in results ] except Exception as e: logger.error(f"AgentDB search failed: {e}") return [] def get_stats(self) -> Dict[str, Any]: """Get HNSW graph statistics.""" return { 'total_vectors': self.store.size(), 'dimension': self.dimension, 'config': { 'M': self.config.M, 'ef_construction': self.config.ef_construction } } ``` **Integration Point**: Modify `src/mcp_standards/hooks/pattern_extractor.py` ```python # MODIFY: Line 58-62 in pattern_extractor.py def __init__(self, db_path: Path): self.db_path = db_path self._ensure_tables() # ADD: AgentDB for fast semantic search self.agentdb = AgentDBAdapter( dimension=384, max_elements=100_000 ) # ADD: Embedding manager (reuse from persistence.py) from ..intelligence.memory.embeddings import EmbeddingManager self.embedder = EmbeddingManager(model_name="all-MiniLM-L6-v2") self._pattern_timestamps: List[datetime] = [] ``` **Expected Improvement**: - Search latency: 50ms → <1ms (50x faster) - Semantic matching: "use uv" matches "prefer uv package manager" (currently fails) - Scalability: Handles 100K+ patterns without degradation --- ### 1.2 Pattern Clustering: No Semantic Grouping **Current Implementation**: `src/mcp_standards/hooks/pattern_extractor.py` ```python # Line 315-377: Exact pattern key matching only def _update_pattern_frequency(self, pattern: Dict[str, Any], project_path: str): """Update frequency tracking for a pattern""" pattern_key = pattern["pattern_key"] # Exact string match # Problem: "correction:pip→uv" and "correction:pip→poetry" # are treated as completely different patterns # No clustering of semantically similar corrections ``` **Gap**: No way to detect that "use uv not pip" and "prefer uv for package management" are the same pattern **Code-Level Recommendation**: ```python # MODIFY: pattern_extractor.py, add new method async def _cluster_similar_patterns(self, new_pattern: Dict[str, Any], threshold: float = 0.85) -> Optional[str]: """ Find semantically similar patterns using AgentDB. Args: new_pattern: Pattern to check for similarity threshold: Similarity threshold for clustering Returns: Existing pattern_key if similar found, else None """ # Generate embedding for new pattern pattern_text = new_pattern.get('description', '') embedding = self.embedder.encode(pattern_text, normalize=True) # Search for similar patterns in AgentDB similar = await self.agentdb.search_similar( query_embedding=embedding, top_k=1, threshold=threshold ) if similar: # Found similar pattern - merge instead of creating duplicate existing = similar[0] logger.info( f"Clustering patterns: '{pattern_text}' → '{existing['pattern_text']}' " f"(similarity: {existing['similarity']:.2f})" ) return existing['pattern_id'] return None # MODIFY: Line 315 in _update_pattern_frequency def _update_pattern_frequency(self, pattern: Dict[str, Any], project_path: str): pattern_key = pattern["pattern_key"] # ADD: Check for semantic similarity before creating new pattern existing_pattern = await self._cluster_similar_patterns(pattern) if existing_pattern: pattern_key = existing_pattern # Update existing pattern instead of creating duplicate ``` **Expected Improvement**: - Reduction in correction threshold: 3 occurrences → 2 occurrences (semantic clustering counts similar patterns) - Deduplication: "use uv not pip" + "prefer uv" = 2 occurrences (currently treated as separate) --- ### 1.3 Startup Performance: Cold Start Delay **Current Problem**: - SQLite initialization: ~500ms (acceptable) - Pattern loading: ~50-100ms for 1K patterns - No warm cache for frequent queries **Gap**: AgentDB targets <10ms startup in memory mode **Code-Level Recommendation**: ```python # NEW FILE: src/intelligence/memory/hybrid_memory.py class HybridMemoryManager: """ Dual-tier memory: AgentDB (hot path) + SQLite (cold path) Hot path (AgentDB): - Semantic pattern search (<1ms) - Frequent pattern queries (<1ms) - Recently learned patterns (<10ms startup) Cold path (SQLite): - Audit logs (50ms acceptable) - Full historical data (unlimited size) - Compliance and temporal tracking """ def __init__(self, sqlite_path: Path, agentdb_mode: str = "memory"): # "memory" or "disk" # Fast in-memory AgentDB for hot queries self.agentdb = AgentDBAdapter(dimension=384) # SQLite for durable storage self.sqlite = sqlite3.connect(sqlite_path) # Query router: semantic → AgentDB, audit → SQLite self.router = QueryRouter(agentdb=self.agentdb, sqlite=self.sqlite) async def query(self, query_text: str, query_type: str = "semantic") -> List[Dict]: """ Route query to appropriate backend. semantic: AgentDB (<1ms) exact: SQLite FTS5 (~50ms) audit: SQLite (50ms+) """ if query_type == "semantic": # Use AgentDB HNSW embedding = self.embedder.encode(query_text) return await self.agentdb.search_similar(embedding) elif query_type == "exact": # Use SQLite FTS5 return self._sqlite_search(query_text) elif query_type == "audit": # Full historical query in SQLite return self._sqlite_audit_query(query_text) ``` --- ## 2. Intelligence Gaps ### 2.1 Pattern Recognition: Regex-Only Detection **Current Implementation**: `src/mcp_standards/hooks/pattern_extractor.py` ```python # Line 23-31: Hardcoded regex patterns only CORRECTION_PHRASES = [ r"actually\s+(?:use|do|need|should)", r"instead\s+(?:of|use)", r"use\s+(\w+)\s+not\s+(\w+)", # Exact phrase match only # ... ] # Line 34-52: Hardcoded tool patterns TOOL_PATTERNS = { "python-package": [ (r"use\s+uv\s+not\s+pip", "use uv for Python package management"), # Doesn't match: "prefer uv", "always use uv", "uv is better than pip" ] } ``` **Problems**: - Fails to detect semantic variations: - "use uv not pip" ✓ (matches) - "prefer uv over pip" ✗ (doesn't match) - "always use uv for packages" ✗ (doesn't match) - "uv is better than pip" ✗ (doesn't match) - No context understanding - No natural language interpretation **Gap**: ReasoningBank provides semantic pattern clustering with embeddings **Code-Level Recommendation**: ```python # MODIFY: pattern_extractor.py class PatternExtractor: # ADD: Semantic correction detection async def _detect_corrections_semantic(self, tool_name: str, args: Dict[str, Any], result: Any) -> List[Dict[str, Any]]: """ Detect correction patterns using semantic understanding. Replaces regex-only detection with embedding similarity. """ patterns = [] combined_text = f"{args} {result}" # Generate embedding for the text text_embedding = self.embedder.encode(combined_text, normalize=True) # Search for similar known correction patterns known_corrections = await self.agentdb.search_similar( query_embedding=text_embedding, top_k=5, threshold=0.75 # High threshold for corrections ) for correction in known_corrections: # Found semantically similar correction patterns.append({ "type": "correction", "pattern_key": correction['pattern_id'], "tool_name": tool_name, "category": correction['metadata']['category'], "description": correction['pattern_text'], "similarity": correction['similarity'], "matched_via": "semantic" # vs "regex" }) # Fallback: Also run regex detection for explicit phrases regex_patterns = self._detect_corrections(tool_name, args, result) # Merge results (deduplicate by semantic similarity) return self._merge_detections(patterns, regex_patterns) # MODIFY: Line 162-202 in extract_patterns() def extract_patterns(self, ...): patterns = [] # 1. Semantic correction detection (NEW) correction_patterns = await self._detect_corrections_semantic( tool_name, args, result ) patterns.extend(correction_patterns) # 2. Keep existing workflow and tool preference detection workflow_patterns = self._detect_workflow_patterns(...) patterns.extend(workflow_patterns) # 3. Update frequency with semantic clustering for pattern in patterns: await self._update_pattern_frequency_semantic(pattern, project_path) ``` **Expected Improvement**: - Detection rate: 40-60% current → 80-90% with semantic matching - Correction threshold: 3 exact matches → 2 similar matches - False negatives: Eliminated for paraphrased corrections --- ### 2.2 No Outcome Tracking: Missing ReasoningBank **Current Implementation**: No outcome tracking exists ```python # pattern_extractor.py: Patterns are promoted based on frequency alone # Line 378-440: _check_promotion_threshold() PROMOTION_THRESHOLD = 3 # Promote after 3 occurrences # Problem: No tracking of whether promoted patterns actually work # "use X not Y" promoted after 3 occurrences, even if user keeps correcting it ``` **Gap**: ReasoningBank tracks pattern outcomes (success/failure) and adjusts confidence **Code-Level Recommendation**: ```python # NEW FILE: src/intelligence/reasoning/reasoning_bank.py import sqlite3 from typing import Dict, Any, Optional from datetime import datetime, timedelta class ReasoningBank: """ Track pattern application outcomes and adjust confidence scores. Learning loop: 1. Pattern applied → record action 2. User corrects again → mark as failure 3. User doesn't correct → mark as success 4. Adjust confidence based on success rate """ def __init__(self, db_path: Path): self.db_path = db_path self._ensure_tables() def _ensure_tables(self): with sqlite3.connect(self.db_path) as conn: # Reasoning episodes table conn.execute(""" CREATE TABLE IF NOT EXISTS reasoning_episodes ( id INTEGER PRIMARY KEY, pattern_id TEXT NOT NULL, context TEXT, action_taken TEXT, outcome TEXT CHECK(outcome IN ('success', 'failure', 'partial')), confidence_before REAL, confidence_after REAL, timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, project_path TEXT, metadata TEXT, FOREIGN KEY (pattern_id) REFERENCES pattern_frequency(pattern_key) ) """) # Outcome tracking indexes conn.execute(""" CREATE INDEX IF NOT EXISTS idx_pattern_outcome ON reasoning_episodes(pattern_id, outcome) """) conn.commit() async def record_outcome(self, pattern_id: str, outcome: str, # "success" | "failure" | "partial" context: Dict[str, Any], confidence_before: float) -> float: """ Record pattern application outcome and return new confidence. Args: pattern_id: Pattern that was applied outcome: Result of application context: Execution context confidence_before: Confidence before application Returns: Updated confidence score """ # Calculate new confidence using Bayesian update confidence_after = self._bayesian_confidence_update( pattern_id=pattern_id, outcome=outcome, confidence_before=confidence_before ) # Store episode with sqlite3.connect(self.db_path) as conn: conn.execute(""" INSERT INTO reasoning_episodes (pattern_id, context, action_taken, outcome, confidence_before, confidence_after, project_path, metadata) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, ( pattern_id, json.dumps(context.get('description', '')), context.get('action', 'unknown'), outcome, confidence_before, confidence_after, context.get('project_path', ''), json.dumps(context.get('metadata', {})) )) # Update pattern confidence in main table conn.execute(""" UPDATE pattern_frequency SET confidence = ? WHERE pattern_key = ? """, (confidence_after, pattern_id)) conn.commit() return confidence_after def _bayesian_confidence_update(self, pattern_id: str, outcome: str, confidence_before: float) -> float: """ Bayesian update of confidence based on outcome. Formula: - Success: confidence += (1.0 - confidence) * 0.2 - Failure: confidence -= confidence * 0.3 - Partial: confidence += (1.0 - confidence) * 0.05 """ if outcome == "success": # Boost confidence (diminishing returns) delta = (1.0 - confidence_before) * 0.2 new_confidence = min(1.0, confidence_before + delta) elif outcome == "failure": # Penalize confidence (faster decay) delta = confidence_before * 0.3 new_confidence = max(0.0, confidence_before - delta) else: # partial # Small boost for partial success delta = (1.0 - confidence_before) * 0.05 new_confidence = min(1.0, confidence_before + delta) return new_confidence async def get_pattern_success_rate(self, pattern_id: str, lookback_days: int = 30) -> Dict[str, Any]: """ Calculate success rate for a pattern over time window. Returns: { 'success_count': int, 'failure_count': int, 'success_rate': float, 'recommended_confidence': float } """ cutoff = (datetime.now() - timedelta(days=lookback_days)).isoformat() with sqlite3.connect(self.db_path) as conn: cursor = conn.execute(""" SELECT COUNT(CASE WHEN outcome = 'success' THEN 1 END) as success_count, COUNT(CASE WHEN outcome = 'failure' THEN 1 END) as failure_count, COUNT(CASE WHEN outcome = 'partial' THEN 1 END) as partial_count FROM reasoning_episodes WHERE pattern_id = ? AND timestamp > ? """, (pattern_id, cutoff)) row = cursor.fetchone() success = row[0] or 0 failure = row[1] or 0 partial = row[2] or 0 total = success + failure + partial if total == 0: return { 'success_count': 0, 'failure_count': 0, 'partial_count': 0, 'success_rate': 0.5, # Neutral prior 'recommended_confidence': 0.5 } # Weighted success rate (partial = 0.5 success) success_rate = (success + 0.5 * partial) / total # Recommended confidence based on success rate and sample size # Confidence increases with both success rate and sample size sample_weight = min(1.0, total / 10) # Full confidence after 10 trials recommended_confidence = success_rate * sample_weight return { 'success_count': success, 'failure_count': failure, 'partial_count': partial, 'success_rate': success_rate, 'recommended_confidence': recommended_confidence, 'total_episodes': total } ``` **Integration**: Hook into pattern application ```python # MODIFY: src/mcp_standards/intelligence/claudemd_manager.py # Add outcome detection in update_claudemd_file() async def apply_pattern_with_tracking(self, pattern_id: str, context: Dict): """Apply pattern and track outcome via ReasoningBank.""" # Get current confidence with sqlite3.connect(self.db_path) as conn: cursor = conn.execute( "SELECT confidence FROM pattern_frequency WHERE pattern_key = ?", (pattern_id,) ) confidence_before = cursor.fetchone()[0] # Apply the pattern (existing logic) result = self._apply_pattern(pattern_id, context) # Detect outcome: Did user correct it again? outcome = await self._detect_outcome(pattern_id, context) # Update confidence via ReasoningBank new_confidence = await self.reasoning_bank.record_outcome( pattern_id=pattern_id, outcome=outcome, context=context, confidence_before=confidence_before ) logger.info( f"Pattern {pattern_id}: outcome={outcome}, " f"confidence {confidence_before:.2f} → {new_confidence:.2f}" ) ``` **Expected Improvement**: - Self-adjusting confidence: Patterns that fail repeatedly get demoted automatically - Adaptive promotion: High-success patterns promoted faster than low-success ones - Outcome visibility: Users can see which patterns actually work --- ## 3. Architecture Gaps ### 3.1 Single-Tier Memory: No Hybrid Architecture **Current Implementation**: Single SQLite database for everything ```python # src/intelligence/memory/persistence.py # Lines 84-137: Single SQLite database def _init_db(self): # All data in one database: # - Memory entries (hot path) # - Embeddings (warm path) # - Statistics (cold path) # - Audit logs (cold path) ``` **Problems**: - No separation of hot/cold data paths - SQLite contention under concurrent access - Vector search competes with audit logging for locks - No optimization for different query patterns **Gap**: V2 requires hybrid AgentDB (hot) + SQLite (cold) architecture **Code-Level Recommendation**: ```python # REWRITE: src/intelligence/memory/persistence.py → hybrid_memory.py class HybridMemoryManager: """ Dual-tier architecture for optimal performance. Tier 1 (Hot Path - AgentDB): - Semantic pattern search (<1ms) - Recent corrections (last 7 days) - Frequently accessed patterns - HNSW graph for vector similarity Tier 2 (Cold Path - SQLite): - Full historical data (unlimited) - Audit logs for compliance - Statistical analysis - Temporal tracking """ def __init__(self, sqlite_path: Path, agentdb_cache_size: int = 10_000): # Hot tier: AgentDB for fast semantic search self.agentdb = AgentDBAdapter( dimension=384, max_elements=agentdb_cache_size ) # Cold tier: SQLite for durable storage self.sqlite_path = sqlite_path self._init_sqlite() # Sync layer: Keep both tiers consistent self.sync_manager = TierSyncManager( hot_tier=self.agentdb, cold_tier=self.sqlite_path ) # Query router self.router = QueryRouter(self) async def store(self, key: str, value: Any, **kwargs): """ Store in both tiers with appropriate indexing. Flow: 1. Generate embedding (once) 2. Store in AgentDB (hot tier, <2ms) 3. Store in SQLite (cold tier, async, 10-50ms) """ # Generate embedding embedding = self._generate_embedding(value) # Hot tier: Fast vector storage await self.agentdb.add_pattern( pattern_id=key, pattern_text=str(value), embedding=embedding, metadata=kwargs.get('metadata', {}) ) # Cold tier: Durable storage (async, non-blocking) asyncio.create_task( self._sqlite_store(key, value, embedding, **kwargs) ) async def query(self, query_text: str, mode: str = "auto") -> List[Dict]: """ Route query to appropriate tier. Modes: - semantic: AgentDB HNSW search (<1ms) - exact: SQLite FTS5 search (~50ms) - audit: SQLite full scan (50ms+) - auto: Route based on query type """ return await self.router.route_query(query_text, mode) async def promote_to_hot_tier(self, pattern_ids: List[str]): """ Promote frequently accessed patterns from SQLite to AgentDB. Background job: Runs every 5 minutes - Identify hot patterns (accessed 5+ times in last hour) - Load from SQLite - Add to AgentDB cache """ pass async def demote_to_cold_tier(self, pattern_ids: List[str]): """ Demote rarely accessed patterns from AgentDB to SQLite-only. Background job: Runs every hour - Identify cold patterns (not accessed in 7 days) - Keep in SQLite only - Remove from AgentDB cache """ pass class TierSyncManager: """ Keeps hot and cold tiers synchronized. Sync strategies: - Write-through: Write to both tiers immediately - Write-back: Write to hot tier, async to cold tier - Lazy-sync: Batch sync every N seconds """ def __init__(self, hot_tier, cold_tier): self.hot = hot_tier self.cold = cold_tier self.pending_syncs = [] async def sync_hot_to_cold(self, pattern_id: str): """Sync a pattern from AgentDB to SQLite.""" # Get from hot tier pattern_data = await self.hot.get(pattern_id) # Write to cold tier (async) await self.cold.store(pattern_id, pattern_data) async def periodic_sync(self): """Background job: Sync all pending changes every 60s.""" while True: await asyncio.sleep(60) # Batch sync all pending changes for pattern_id in self.pending_syncs: await self.sync_hot_to_cold(pattern_id) self.pending_syncs.clear() class QueryRouter: """ Routes queries to appropriate tier based on characteristics. Routing logic: - Semantic queries → AgentDB (fast) - Exact match → SQLite (acceptable) - Historical → SQLite (cold path) - Audit → SQLite (compliance) """ def __init__(self, hybrid_manager): self.manager = hybrid_manager async def route_query(self, query: str, mode: str = "auto") -> List[Dict]: """Route query to optimal tier.""" if mode == "semantic": # AgentDB HNSW search embedding = self.manager._generate_embedding(query) return await self.manager.agentdb.search_similar(embedding) elif mode == "exact": # SQLite FTS5 search return await self.manager._sqlite_exact_search(query) elif mode == "audit": # SQLite full historical query return await self.manager._sqlite_audit_query(query) else: # auto # Heuristic routing if len(query.split()) > 3: # Natural language → semantic search return await self.route_query(query, mode="semantic") else: # Short query → exact match return await self.route_query(query, mode="exact") ``` **Expected Improvement**: - Query latency: 50ms → <1ms for semantic queries (50x faster) - Scalability: Handle 100K+ patterns without degradation - Concurrency: No lock contention between hot/cold paths --- ### 3.2 No Event-Driven Architecture **Current Implementation**: Manual triggers in `claudemd_manager.py` ```python # Line 231-287: update_claudemd_file() must be called explicitly def update_claudemd_file(self, file_path: Path, ...): """ Update CLAUDE.md file with learned preferences Problem: User must manually call this function No automatic detection of: - Config file changes (.editorconfig, pyproject.toml) - Pattern promotion threshold reached - Cross-project pattern repetition """ ``` **Gap**: V2 requires event-driven automation with file watchers and background jobs **Code-Level Recommendation**: ```python # NEW FILE: src/intelligence/events/event_bus.py from typing import Callable, Dict, List, Any import asyncio from dataclasses import dataclass from datetime import datetime @dataclass class Event: """Event data structure.""" type: str payload: Dict[str, Any] timestamp: datetime source: str class EventBus: """ Pub/sub event bus for system-wide coordination. Events: - pattern_detected: New pattern found - pattern_promoted: Pattern ready for CLAUDE.md - config_changed: Project config file changed - claudemd_update_needed: Suggestions available """ def __init__(self): self._subscribers: Dict[str, List[Callable]] = {} self._event_queue = asyncio.Queue() def subscribe(self, event_type: str, handler: Callable): """Subscribe to event type.""" if event_type not in self._subscribers: self._subscribers[event_type] = [] self._subscribers[event_type].append(handler) async def publish(self, event: Event): """Publish event to all subscribers.""" if event.type in self._subscribers: for handler in self._subscribers[event.type]: try: await handler(event) except Exception as e: logger.error(f"Event handler error: {e}") async def start(self): """Start event processing loop.""" while True: event = await self._event_queue.get() await self.publish(event) # NEW FILE: src/intelligence/events/config_watcher.py from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler import asyncio class ConfigFileWatcher(FileSystemEventHandler): """ Watch project config files and trigger CLAUDE.md updates. Watched files: - .editorconfig - pyproject.toml - .prettierrc - package.json - tsconfig.json """ WATCHED_FILES = { '.editorconfig', 'pyproject.toml', '.prettierrc', '.prettierrc.json', 'package.json', 'tsconfig.json', '.eslintrc.json' } def __init__(self, event_bus: EventBus, project_path: str): self.event_bus = event_bus self.project_path = project_path self.observer = Observer() def on_modified(self, event): """Handle file modification events.""" if event.is_directory: return filename = Path(event.src_path).name if filename in self.WATCHED_FILES: # Config file changed → trigger CLAUDE.md update asyncio.create_task( self.event_bus.publish(Event( type="config_changed", payload={ 'file_path': event.src_path, 'project_path': self.project_path }, timestamp=datetime.now(), source="config_watcher" )) ) def start(self): """Start watching project directory.""" self.observer.schedule(self, self.project_path, recursive=False) self.observer.start() def stop(self): """Stop watching.""" self.observer.stop() self.observer.join() # NEW FILE: src/intelligence/events/proactive_suggester.py class ProactiveSuggester: """ Background job: Proactively suggest CLAUDE.md updates. Runs every 5 minutes: 1. Check for patterns ready for promotion 2. Check for cross-project patterns 3. Send MCP notification if updates available """ def __init__(self, db_path: Path, event_bus: EventBus, check_interval: int = 300): # 5 minutes self.db_path = db_path self.event_bus = event_bus self.check_interval = check_interval async def start(self): """Start background checking loop.""" while True: await asyncio.sleep(self.check_interval) await self.check_for_updates() async def check_for_updates(self): """Check for patterns ready for promotion.""" with sqlite3.connect(self.db_path) as conn: # Find patterns ready for promotion cursor = conn.execute(""" SELECT pattern_key, pattern_description, occurrence_count, confidence FROM pattern_frequency WHERE occurrence_count >= 3 AND promoted_to_preference = FALSE AND confidence >= 0.8 """) promotable = cursor.fetchall() if promotable: # Publish event await self.event_bus.publish(Event( type="claudemd_update_needed", payload={ 'pattern_count': len(promotable), 'patterns': [ { 'key': row[0], 'description': row[1], 'count': row[2], 'confidence': row[3] } for row in promotable ] }, timestamp=datetime.now(), source="proactive_suggester" )) logger.info( f"Found {len(promotable)} patterns ready for CLAUDE.md" ) ``` **Integration**: Wire up events in server ```python # MODIFY: src/mcp_standards/server.py class MCPStandardsServer: def __init__(self): # Initialize event bus self.event_bus = EventBus() # Initialize components self.pattern_extractor = PatternExtractor(self.db_path) self.claudemd_manager = ClaudeMdManager(self.db_path) # Subscribe to events self.event_bus.subscribe( "pattern_promoted", self._handle_pattern_promotion ) self.event_bus.subscribe( "config_changed", self._handle_config_change ) self.event_bus.subscribe( "claudemd_update_needed", self._handle_update_notification ) # Start watchers self.config_watcher = ConfigFileWatcher( event_bus=self.event_bus, project_path=os.getcwd() ) self.config_watcher.start() # Start background jobs self.suggester = ProactiveSuggester( db_path=self.db_path, event_bus=self.event_bus ) asyncio.create_task(self.suggester.start()) async def _handle_pattern_promotion(self, event: Event): """Auto-update CLAUDE.md when pattern promoted.""" pattern = event.payload # Update CLAUDE.md file automatically await self.claudemd_manager.update_claudemd_file( file_path=Path.home() / ".claude" / "CLAUDE.md", min_confidence=0.8 ) logger.info(f"Auto-updated CLAUDE.md for pattern: {pattern['description']}") async def _handle_config_change(self, event: Event): """Re-parse config and update CLAUDE.md.""" file_path = event.payload['file_path'] # Parse new config # Update CLAUDE.md with new conventions pass async def _handle_update_notification(self, event: Event): """Send MCP notification to user.""" pattern_count = event.payload['pattern_count'] # Send notification via MCP protocol await self.notify_user( title="CLAUDE.md Updates Available", message=f"Found {pattern_count} new patterns ready for promotion", actions=["Update Now", "Review", "Dismiss"] ) ``` **Expected Improvement**: - Zero manual triggers: All updates happen automatically - Real-time config parsing: Changes detected within 1 second - Proactive notifications: User informed when updates ready --- ## 4. Integration Gaps ### 4.1 No AgentDB Package Integration **Current State**: - FAISS available but not integrated into pattern_extractor.py - No AgentDB npm package - No HNSW graph infrastructure **Gap**: AgentDB npm package with <10ms startup **Code-Level Recommendation**: ```json // ADD: package.json (create if not exists) { "name": "mcp-standards", "version": "2.0.0", "dependencies": { "agentdb": "^1.0.0" // AgentDB npm package }, "scripts": { "postinstall": "node scripts/init-agentdb.js" } } ``` ```javascript // NEW FILE: scripts/init-agentdb.js /** * Initialize AgentDB on first install */ const { VectorStore } = require('agentdb'); const path = require('path'); const fs = require('fs'); async function initializeAgentDB() { const dbPath = path.join(process.env.HOME, '.mcp-standards', 'agentdb'); // Create directory fs.mkdirSync(dbPath, { recursive: true }); // Initialize vector store const store = new VectorStore({ path: dbPath, dimension: 384, // all-MiniLM-L6-v2 metric: 'cosine' }); console.log('AgentDB initialized at:', dbPath); } initializeAgentDB().catch(console.error); ``` ```python # ADD: Python wrapper for AgentDB # NEW FILE: src/intelligence/memory/agentdb_bridge.py import subprocess import json from typing import List, Dict, Any import numpy as np class AgentDBBridge: """ Python bridge to AgentDB npm package. Uses Node.js child process to access AgentDB functionality. Alternative: Use rust bindings for native Python integration. """ def __init__(self, db_path: str): self.db_path = db_path # Check if AgentDB is installed try: subprocess.run(['npx', 'agentdb', '--version'], check=True, capture_output=True) except subprocess.CalledProcessError: raise RuntimeError( "AgentDB not installed. Run: npm install agentdb" ) def add(self, vector: np.ndarray, metadata: Dict) -> str: """Add vector to AgentDB.""" cmd = [ 'npx', 'agentdb', 'add', '--vector', json.dumps(vector.tolist()), '--metadata', json.dumps(metadata) ] result = subprocess.run(cmd, capture_output=True, text=True) return result.stdout.strip() def search(self, query_vector: np.ndarray, top_k: int = 5, threshold: float = 0.7) -> List[Dict]: """Search AgentDB for similar vectors.""" cmd = [ 'npx', 'agentdb', 'search', '--vector', json.dumps(query_vector.tolist()), '--top-k', str(top_k), '--threshold', str(threshold) ] result = subprocess.run(cmd, capture_output=True, text=True) return json.loads(result.stdout) ``` --- ### 4.2 No Embedding Pipeline **Current State**: `persistence.py` has embeddings but not used in pattern learning ```python # src/intelligence/memory/embeddings.py exists but unused # pattern_extractor.py uses regex only (no embeddings) ``` **Gap**: Need embedding pipeline integrated into pattern detection **Code-Level Recommendation**: ```python # MODIFY: src/mcp_standards/hooks/pattern_extractor.py # Add embedding generation to __init__ def __init__(self, db_path: Path): self.db_path = db_path self._ensure_tables() # ADD: Initialize embedding manager from ..intelligence.memory.embeddings import EmbeddingManager self.embedder = EmbeddingManager(model_name="all-MiniLM-L6-v2") # ADD: Initialize AgentDB from ..intelligence.memory.agentdb_adapter import AgentDBAdapter self.agentdb = AgentDBAdapter(dimension=384) self._pattern_timestamps: List[datetime] = [] # MODIFY: _update_pattern_frequency to include embeddings def _update_pattern_frequency(self, pattern: Dict[str, Any], project_path: str): pattern_key = pattern["pattern_key"] pattern_text = pattern.get("description", "") # Generate embedding for pattern embedding = self.embedder.encode(pattern_text, normalize=True) with sqlite3.connect(self.db_path) as conn: cursor = conn.execute( "SELECT id, occurrence_count FROM pattern_frequency WHERE pattern_key = ?", (pattern_key,) ) row = cursor.fetchone() if row: # Update existing pattern_id, count = row new_count = count + 1 confidence = min(1.0, new_count / 10) conn.execute(""" UPDATE pattern_frequency SET occurrence_count = ?, confidence = ?, last_seen = ?, pattern_description = ? WHERE id = ? """, (new_count, confidence, datetime.now().isoformat(), pattern_text, pattern_id)) else: # Insert new conn.execute(""" INSERT INTO pattern_frequency (pattern_key, tool_name, pattern_type, pattern_description, occurrence_count, confidence, examples) VALUES (?, ?, ?, ?, ?, ?, ?) """, ( pattern_key, pattern["tool_name"], pattern["type"], pattern_text, 1, 0.1, json.dumps([pattern]) )) pattern_id = conn.execute("SELECT last_insert_rowid()").fetchone()[0] conn.commit() # ADD: Store in AgentDB for semantic search await self.agentdb.add_pattern( pattern_id=pattern_key, pattern_text=pattern_text, embedding=embedding, metadata={ 'tool_name': pattern["tool_name"], 'type': pattern["type"], 'category': pattern.get("category", "general"), 'occurrence_count': new_count if row else 1 } ) ``` --- ## 5. User Experience Gaps ### 5.1 Manual Pattern Promotion **Current Implementation**: `claudemd_manager.py` lines 382-477 ```python def check_for_promotion(self, project_path: str, threshold: int = 3): """ Check if project-specific preferences should be promoted to global. Problem: User must manually call this function No automatic promotion when pattern appears in 3+ projects """ def promote_to_global(self, category: str, preference: str): """ Promote a project-specific preference to global. Problem: User must manually specify which pattern to promote No automatic detection of cross-project patterns """ ``` **Gap**: V2 requires automatic cross-project promotion **Code-Level Recommendation**: ```python # ADD: Automatic promotion background job # NEW FILE: src/intelligence/events/auto_promoter.py class AutoPromoter: """ Background job: Automatically promote cross-project patterns. Runs every 30 minutes: 1. Find patterns appearing in 3+ projects 2. Calculate average confidence 3. Promote to global CLAUDE.md 4. Notify user """ def __init__(self, db_path: Path, claudemd_manager: ClaudeMdManager, event_bus: EventBus): self.db_path = db_path self.claudemd_manager = claudemd_manager self.event_bus = event_bus async def start(self): """Start auto-promotion loop.""" while True: await asyncio.sleep(1800) # 30 minutes await self.check_and_promote() async def check_and_promote(self): """Check for patterns ready for global promotion.""" # Find cross-project patterns promotable = self.claudemd_manager.check_for_promotion( project_path=None, # All projects threshold=3 ) for pattern in promotable: # Automatically promote success = self.claudemd_manager.promote_to_global( category=pattern['category'], preference=pattern['preference'] ) if success: # Update global CLAUDE.md await self.claudemd_manager.update_claudemd_file( file_path=Path.home() / ".claude" / "CLAUDE.md", project_path=None # Global ) # Publish event await self.event_bus.publish(Event( type="pattern_promoted_to_global", payload={ 'category': pattern['category'], 'preference': pattern['preference'], 'project_count': pattern['project_count'], 'avg_confidence': pattern['avg_confidence'] }, timestamp=datetime.now(), source="auto_promoter" )) logger.info( f"Auto-promoted pattern to global: {pattern['preference']} " f"(seen in {pattern['project_count']} projects)" ) ``` --- ### 5.2 No User Notifications **Current State**: No MCP notification system **Gap**: V2 requires MCP notifications for: - Patterns ready for promotion - CLAUDE.md updates available - Config file changes detected **Code-Level Recommendation**: ```python # MODIFY: src/mcp_standards/server.py # Add MCP notification support class MCPStandardsServer: async def notify_user(self, title: str, message: str, actions: List[str] = None): """ Send notification via MCP protocol. MCP notification format: { "jsonrpc": "2.0", "method": "notifications/message", "params": { "level": "info", "message": "...", "actions": [...] } } """ notification = { "jsonrpc": "2.0", "method": "notifications/message", "params": { "level": "info", "message": f"{title}\n\n{message}", "actions": actions or [] } } # Send via MCP transport await self.write_notification(notification) async def _handle_update_notification(self, event: Event): """Handle CLAUDE.md update notifications.""" patterns = event.payload['patterns'] await self.notify_user( title="🎯 New Patterns Learned", message=f"Found {len(patterns)} patterns ready for CLAUDE.md:\n" + "\n".join([f"- {p['description']}" for p in patterns]), actions=["Update CLAUDE.md", "Review Patterns", "Dismiss"] ) ``` --- ## 6. Summary of Critical Gaps ### Performance Gaps (Priority: HIGH) 1. **Search Latency**: 50ms → <1ms target (50x improvement needed) - File: `src/intelligence/memory/persistence.py` (lines 435-487) - Solution: Add `agentdb_adapter.py` with HNSW graph 2. **Pattern Clustering**: No semantic grouping (0% → 80% accuracy) - File: `src/mcp_standards/hooks/pattern_extractor.py` (lines 315-377) - Solution: Add `_cluster_similar_patterns()` method 3. **Startup Time**: 500ms → <10ms target - File: Multiple (initialization scattered) - Solution: Create `hybrid_memory.py` with tiered architecture ### Intelligence Gaps (Priority: HIGH) 4. **Regex-Only Detection**: 40% recall → 90% target - File: `src/mcp_standards/hooks/pattern_extractor.py` (lines 23-52) - Solution: Add `_detect_corrections_semantic()` method 5. **No Outcome Tracking**: Patterns promoted blindly - File: None (missing component) - Solution: Create `src/intelligence/reasoning/reasoning_bank.py` ### Architecture Gaps (Priority: MEDIUM) 6. **Single-Tier Memory**: No hot/cold separation - File: `src/intelligence/memory/persistence.py` (entire file) - Solution: Rewrite as `hybrid_memory.py` with AgentDB + SQLite 7. **No Event System**: Manual triggers only - File: `src/mcp_standards/intelligence/claudemd_manager.py` (lines 231-287) - Solution: Create `src/intelligence/events/event_bus.py` ### Integration Gaps (Priority: MEDIUM) 8. **No AgentDB Package**: FAISS exists but not used - File: None (missing npm package) - Solution: Add `package.json` with agentdb dependency 9. **No Embedding Pipeline**: Embeddings unused in pattern learning - File: `src/intelligence/memory/embeddings.py` (isolated, not integrated) - Solution: Integrate into `pattern_extractor.py.__init__()` ### UX Gaps (Priority: LOW) 10. **Manual Promotion**: No auto cross-project promotion - File: `src/mcp_standards/intelligence/claudemd_manager.py` (lines 382-477) - Solution: Create `src/intelligence/events/auto_promoter.py` 11. **No Notifications**: No user feedback for updates - File: `src/mcp_standards/server.py` (missing method) - Solution: Add `notify_user()` method with MCP protocol --- ## 7. Implementation Roadmap ### Phase 1: AgentDB Core (Week 1-2) **Files to Create**: - `src/intelligence/memory/agentdb_adapter.py` (200 LOC) - `src/intelligence/memory/hybrid_memory.py` (400 LOC) - `package.json` (20 LOC) - `scripts/init-agentdb.js` (50 LOC) **Files to Modify**: - `src/mcp_standards/hooks/pattern_extractor.py` (+50 LOC) - `src/intelligence/memory/persistence.py` (refactor into hybrid_memory.py) **Expected Outcome**: 50x search performance improvement --- ### Phase 2: Semantic Intelligence (Week 3) **Files to Create**: - `src/intelligence/reasoning/reasoning_bank.py` (300 LOC) - Tests for semantic clustering **Files to Modify**: - `src/mcp_standards/hooks/pattern_extractor.py` (+100 LOC for semantic detection) - `src/mcp_standards/intelligence/claudemd_manager.py` (+50 LOC for outcome tracking) **Expected Outcome**: 2x reduction in correction threshold (3 → 2) --- ### Phase 3: Event-Driven Automation (Week 4) **Files to Create**: - `src/intelligence/events/event_bus.py` (150 LOC) - `src/intelligence/events/config_watcher.py` (100 LOC) - `src/intelligence/events/proactive_suggester.py` (150 LOC) - `src/intelligence/events/auto_promoter.py` (100 LOC) **Files to Modify**: - `src/mcp_standards/server.py` (+100 LOC for event wiring) **Expected Outcome**: Zero-touch CLAUDE.md updates --- ### Phase 4: Polish & Testing (Week 5) **Files to Create**: - `tests/test_agentdb_integration.py` - `tests/test_reasoning_bank.py` - `tests/test_event_system.py` - `docs/architecture/v2-migration-guide.md` **Files to Modify**: - Re-enable tests in `tests/_disabled/` directory (200+ tests) **Expected Outcome**: >80% test coverage, production-ready --- ## 8. Risk Mitigation ### Risk: AgentDB npm package not available **Mitigation**: Fallback to pure FAISS implementation ```python # agentdb_adapter.py can use FAISS if AgentDB unavailable try: from agentdb import VectorStore except ImportError: logger.warning("AgentDB not available, using FAISS fallback") from .faiss_fallback import FAISSStore as VectorStore ``` ### Risk: Embedding API costs **Mitigation**: Default to local sentence-transformers ```python # embeddings.py already uses local all-MiniLM-L6-v2 model # No API calls required ``` ### Risk: Data migration complexity **Mitigation**: Dual-read during transition ```python # hybrid_memory.py supports both old and new formats async def migrate_from_v1(self, old_db_path: Path): """Migrate v1 SQLite data to v2 hybrid architecture.""" # Read from old SQLite # Generate embeddings for existing patterns # Populate AgentDB # Keep SQLite as fallback ``` --- ## 9. Success Metrics | Metric | V1 Current | V2 Target | Gap | |--------|-----------|-----------|-----| | Search latency | 50ms+ | <1ms | 50x | | Pattern detection | 40% recall | 90% recall | 2.25x | | Correction threshold | 3 occurrences | 2 occurrences | 1.5x | | Startup time | ~500ms | <10ms | 50x | | CLAUDE.md updates | Manual | Automatic | ∞ | | Cross-project learning | Manual | Automatic | ∞ | --- **End of Implementation Gaps Analysis**

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/airmcp-com/mcp-standards'

If you have feedback or need assistance with the MCP directory API, please join our Discord server