Skip to main content
Glama
working_memory_api.py68.7 kB
""" Working Memory Dashboard API Provides real-time working memory management and optimization endpoints for the UMS Explorer. """ import asyncio import difflib import hashlib import json import sqlite3 import time from dataclasses import asdict, dataclass from typing import Dict, List, Optional from fastapi import FastAPI, HTTPException, WebSocket, WebSocketDisconnect from pydantic import BaseModel @dataclass class WorkingMemoryItem: """Enhanced memory item with working memory specific metadata.""" memory_id: str content: str memory_type: str memory_level: str importance: int confidence: float created_at: float last_accessed_at: Optional[float] access_count: int workflow_id: Optional[str] # Working memory specific fields temperature: float = 0.0 # Activity level (0-100) priority: str = "medium" # critical, high, medium, low access_frequency: float = 0.0 # Normalized access frequency retention_score: float = 0.0 # How likely to remain in working memory added_at: float = 0.0 # When added to working memory @dataclass class QualityIssue: """Represents a memory quality issue.""" issue_id: str issue_type: str # duplicate, orphaned, low_quality, stale, corrupted severity: str # critical, high, medium, low memory_ids: List[str] title: str description: str recommendation: str impact_score: float auto_fixable: bool estimated_savings: Dict[str, float] # storage, performance, clarity metadata: Dict @dataclass class QualityAnalysisResult: """Result of memory quality analysis.""" total_memories: int issues_found: int duplicates: int orphaned: int low_quality: int stale_memories: int corrupted: int overall_score: float # 0-100 issues: List[QualityIssue] recommendations: List[str] analysis_time: float @dataclass class DuplicateCluster: """Group of duplicate or similar memories.""" cluster_id: str memory_ids: List[str] similarity_score: float primary_memory_id: str # Best quality memory in cluster duplicate_count: int content_preview: str metadata: Dict @dataclass class BulkOperation: """Represents a bulk operation on memories.""" operation_id: str operation_type: str # delete, merge, update, archive memory_ids: List[str] preview_changes: List[Dict] estimated_impact: Dict[str, float] reversible: bool confirmation_required: bool @dataclass class WorkingMemoryStats: """Working memory statistics and metrics.""" active_count: int capacity: int pressure: float # 0-100% temperature: float # Average activity level focus_score: float # 0-100% efficiency: float # 0-100% avg_retention_time: float total_accesses: int last_updated: float @dataclass class OptimizationSuggestion: """Memory optimization suggestion.""" id: str title: str description: str priority: str # high, medium, low impact: str # High, Medium, Low icon: str action: str confidence: float = 0.0 estimated_improvement: Dict[str, float] = None class WorkingMemoryRequest(BaseModel): memory_id: str class OptimizationRequest(BaseModel): suggestion_id: str class FocusModeRequest(BaseModel): mode: str # normal, deep, creative, analytical, maintenance retention_time: Optional[int] = None max_working_memory: Optional[int] = None class QualityAnalysisRequest(BaseModel): analysis_type: str = "comprehensive" # comprehensive, duplicates, orphaned, low_quality include_stale: bool = True include_low_importance: bool = True similarity_threshold: float = 0.85 stale_threshold_days: int = 30 class BulkOperationRequest(BaseModel): operation_type: str # delete, merge, archive, update memory_ids: List[str] merge_strategy: Optional[str] = "preserve_highest_importance" # For merge operations target_memory_id: Optional[str] = None # For merge operations update_data: Optional[Dict] = None # For update operations class MemoryQualityInspector: """Core memory quality analysis and management logic.""" def __init__(self, db_path: str = "storage/unified_agent_memory.db"): self.db_path = db_path def get_db_connection(self): """Get database connection.""" conn = sqlite3.connect(self.db_path) conn.row_factory = sqlite3.Row return conn def calculate_content_hash(self, content: str) -> str: """Calculate content hash for duplicate detection.""" normalized = content.strip().lower() return hashlib.md5(normalized.encode()).hexdigest() def calculate_similarity(self, content1: str, content2: str) -> float: """Calculate content similarity using difflib.""" normalized1 = content1.strip().lower() normalized2 = content2.strip().lower() # Use sequence matcher for similarity similarity = difflib.SequenceMatcher(None, normalized1, normalized2).ratio() return similarity def detect_duplicates(self, memories: List[Dict], threshold: float = 0.85) -> List[DuplicateCluster]: """Detect duplicate memories using content similarity.""" clusters = [] processed_ids = set() for i, memory1 in enumerate(memories): if memory1['memory_id'] in processed_ids: continue cluster_memories = [memory1] cluster_ids = {memory1['memory_id']} for _j, memory2 in enumerate(memories[i+1:], i+1): if memory2['memory_id'] in processed_ids: continue similarity = self.calculate_similarity(memory1['content'], memory2['content']) if similarity >= threshold: cluster_memories.append(memory2) cluster_ids.add(memory2['memory_id']) if len(cluster_memories) > 1: # Find the best quality memory (highest importance * confidence) primary = max(cluster_memories, key=lambda m: (m.get('importance', 1) * m.get('confidence', 0.5))) cluster = DuplicateCluster( cluster_id=f"dup_{memory1['memory_id'][:8]}", memory_ids=list(cluster_ids), similarity_score=max(self.calculate_similarity(memory1['content'], m['content']) for m in cluster_memories[1:]), primary_memory_id=primary['memory_id'], duplicate_count=len(cluster_memories) - 1, content_preview=memory1['content'][:100] + "..." if len(memory1['content']) > 100 else memory1['content'], metadata={ 'avg_importance': sum(m.get('importance', 1) for m in cluster_memories) / len(cluster_memories), 'avg_confidence': sum(m.get('confidence', 0.5) for m in cluster_memories) / len(cluster_memories), 'total_size': sum(len(m['content']) for m in cluster_memories) } ) clusters.append(cluster) processed_ids.update(cluster_ids) return clusters def detect_orphaned_memories(self, memories: List[Dict]) -> List[Dict]: """Detect orphaned memories not connected to any workflow or relationship.""" conn = self.get_db_connection() try: cursor = conn.cursor() orphaned = [] for memory in memories: memory_id = memory['memory_id'] # Check if memory has workflow association has_workflow = memory.get('workflow_id') is not None # Check if memory is linked to other memories cursor.execute(""" SELECT COUNT(*) as count FROM memory_links WHERE source_memory_id = ? OR target_memory_id = ? """, (memory_id, memory_id)) link_count = cursor.fetchone()['count'] # Check if memory is referenced in goals or actions cursor.execute(""" SELECT COUNT(*) as action_count FROM actions WHERE memory_id = ? OR input_data LIKE ? OR output_data LIKE ? """, (memory_id, f'%{memory_id}%', f'%{memory_id}%')) action_refs = cursor.fetchone()['action_count'] cursor.execute(""" SELECT COUNT(*) as goal_count FROM goals WHERE memory_id = ? OR description LIKE ? """, (memory_id, f'%{memory_id}%')) goal_refs = cursor.fetchone()['goal_count'] # Memory is orphaned if it has no workflow, no links, and no references if not has_workflow and link_count == 0 and action_refs == 0 and goal_refs == 0: orphaned.append({ **memory, 'orphan_score': self.calculate_orphan_score(memory), 'isolation_level': 'complete' }) elif link_count == 0 and (action_refs == 0 or goal_refs == 0): orphaned.append({ **memory, 'orphan_score': self.calculate_orphan_score(memory), 'isolation_level': 'partial' }) return orphaned finally: conn.close() def calculate_orphan_score(self, memory: Dict) -> float: """Calculate how orphaned a memory is (0-100, higher = more orphaned).""" score = 50 # Base score # Adjust based on importance (lower importance = more likely orphan) importance = memory.get('importance', 1) score += (5 - importance) * 10 # Adjust based on confidence (lower confidence = more likely orphan) confidence = memory.get('confidence', 0.5) score += (0.5 - confidence) * 50 # Adjust based on age (older = more likely to be orphaned) created_at = memory.get('created_at', time.time()) age_days = (time.time() - created_at) / 86400 if age_days > 30: score += min(20, age_days / 10) # Adjust based on access patterns access_count = memory.get('access_count', 0) if access_count == 0: score += 15 elif access_count < 3: score += 10 return min(100, max(0, score)) def analyze_memory_quality(self, memory: Dict) -> Dict: """Analyze individual memory quality.""" quality_score = 50 # Base score issues = [] content = memory.get('content', '') importance = memory.get('importance', 1) confidence = memory.get('confidence', 0.5) # Content quality checks if len(content) < 10: issues.append("Content too short") quality_score -= 20 elif len(content) > 10000: issues.append("Content extremely long") quality_score -= 10 # Check for common quality issues if content.count('\n') / max(1, len(content)) > 0.1: # Too many line breaks issues.append("Excessive line breaks") quality_score -= 5 if len(set(content.split())) / max(1, len(content.split())) < 0.3: # Low vocabulary diversity issues.append("Low vocabulary diversity") quality_score -= 10 # Importance and confidence checks if importance < 3: issues.append("Low importance rating") quality_score -= 10 if confidence < 0.3: issues.append("Low confidence rating") quality_score -= 15 # Memory type consistency memory_type = memory.get('memory_type', '') memory_level = memory.get('memory_level', '') if not memory_type: issues.append("Missing memory type") quality_score -= 15 if not memory_level: issues.append("Missing memory level") quality_score -= 15 # Check for encoding issues or corruption try: content.encode('utf-8').decode('utf-8') except UnicodeError: issues.append("Encoding corruption detected") quality_score -= 25 # Age and staleness created_at = memory.get('created_at', time.time()) age_days = (time.time() - created_at) / 86400 if age_days > 90 and memory.get('access_count', 0) == 0: issues.append("Stale memory (old and unaccessed)") quality_score -= 20 return { 'quality_score': max(0, min(100, quality_score)), 'issues': issues, 'recommendations': self.generate_quality_recommendations(memory, issues) } def generate_quality_recommendations(self, memory: Dict, issues: List[str]) -> List[str]: """Generate recommendations for improving memory quality.""" recommendations = [] if "Content too short" in issues: recommendations.append("Consider expanding content with more context or details") if "Content extremely long" in issues: recommendations.append("Consider breaking into smaller, focused memories") if "Low importance rating" in issues: recommendations.append("Review and adjust importance rating if memory is valuable") if "Low confidence rating" in issues: recommendations.append("Verify information accuracy and update confidence") if "Missing memory type" in issues: recommendations.append("Assign appropriate memory type classification") if "Stale memory (old and unaccessed)" in issues: recommendations.append("Archive or delete if no longer relevant") if "Encoding corruption detected" in issues: recommendations.append("Critical: Clean up encoding issues immediately") return recommendations async def perform_quality_analysis(self, request: QualityAnalysisRequest) -> QualityAnalysisResult: """Perform comprehensive memory quality analysis.""" start_time = time.time() conn = self.get_db_connection() try: cursor = conn.cursor() # Get all memories cursor.execute("SELECT * FROM memories ORDER BY created_at DESC") memories = [dict(row) for row in cursor.fetchall()] total_memories = len(memories) issues = [] # Detect duplicates duplicates = [] if request.analysis_type in ['comprehensive', 'duplicates']: duplicate_clusters = self.detect_duplicates(memories, request.similarity_threshold) for cluster in duplicate_clusters: issue = QualityIssue( issue_id=f"dup_{cluster.cluster_id}", issue_type="duplicate", severity="medium" if cluster.duplicate_count <= 2 else "high", memory_ids=cluster.memory_ids, title=f"Duplicate memories ({cluster.duplicate_count} duplicates)", description=f"Found {cluster.duplicate_count} duplicate memories with {cluster.similarity_score:.2%} similarity", recommendation=f"Merge duplicates into primary memory {cluster.primary_memory_id}", impact_score=cluster.duplicate_count * 10, auto_fixable=True, estimated_savings={ 'storage': len(cluster.content_preview) * cluster.duplicate_count * 0.8, 'performance': cluster.duplicate_count * 5, 'clarity': cluster.duplicate_count * 15 }, metadata=cluster.metadata ) issues.append(issue) duplicates.extend(cluster.memory_ids[1:]) # Exclude primary # Detect orphaned memories orphaned = [] if request.analysis_type in ['comprehensive', 'orphaned']: orphaned_memories = self.detect_orphaned_memories(memories) for orphan in orphaned_memories: issue = QualityIssue( issue_id=f"orphan_{orphan['memory_id'][:8]}", issue_type="orphaned", severity="low" if orphan['orphan_score'] < 70 else "medium", memory_ids=[orphan['memory_id']], title=f"Orphaned memory (isolation: {orphan['isolation_level']})", description="Memory has no connections to workflows, goals, or other memories", recommendation="Connect to relevant workflow or consider archiving", impact_score=orphan['orphan_score'], auto_fixable=orphan['isolation_level'] == 'complete' and orphan['orphan_score'] > 80, estimated_savings={ 'clarity': orphan['orphan_score'] * 0.5, 'organization': 20 }, metadata={'orphan_score': orphan['orphan_score'], 'isolation_level': orphan['isolation_level']} ) issues.append(issue) orphaned.append(orphan['memory_id']) # Analyze individual memory quality low_quality = [] corrupted = [] if request.analysis_type in ['comprehensive', 'low_quality']: for memory in memories: quality_analysis = self.analyze_memory_quality(memory) if quality_analysis['quality_score'] < 30: issue = QualityIssue( issue_id=f"quality_{memory['memory_id'][:8]}", issue_type="low_quality", severity="high" if quality_analysis['quality_score'] < 20 else "medium", memory_ids=[memory['memory_id']], title=f"Low quality memory (score: {quality_analysis['quality_score']})", description=f"Quality issues: {', '.join(quality_analysis['issues'])}", recommendation='; '.join(quality_analysis['recommendations']), impact_score=50 - quality_analysis['quality_score'], auto_fixable=False, estimated_savings={'quality': 50 - quality_analysis['quality_score']}, metadata={'quality_analysis': quality_analysis} ) issues.append(issue) low_quality.append(memory['memory_id']) # Check for corruption if "Encoding corruption detected" in quality_analysis['issues']: corrupted.append(memory['memory_id']) # Detect stale memories stale_memories = [] if request.include_stale: stale_cutoff = time.time() - (request.stale_threshold_days * 86400) for memory in memories: if (memory.get('created_at', time.time()) < stale_cutoff and memory.get('access_count', 0) == 0 and memory.get('importance', 1) < 5): issue = QualityIssue( issue_id=f"stale_{memory['memory_id'][:8]}", issue_type="stale", severity="low", memory_ids=[memory['memory_id']], title=f"Stale memory ({(time.time() - memory.get('created_at', time.time())) / 86400:.0f} days old)", description="Old memory with no recent access and low importance", recommendation="Archive or delete if no longer relevant", impact_score=min(30, (time.time() - memory.get('created_at', time.time())) / 86400 * 0.5), auto_fixable=True, estimated_savings={'storage': len(memory.get('content', ''))}, metadata={'age_days': (time.time() - memory.get('created_at', time.time())) / 86400} ) issues.append(issue) stale_memories.append(memory['memory_id']) # Calculate overall quality score issues_count = len(issues) overall_score = max(0, 100 - (issues_count * 5) - (len(duplicates) * 2) - (len(orphaned) * 1)) # Generate high-level recommendations recommendations = [] if len(duplicates) > 10: recommendations.append("High number of duplicates detected. Run bulk duplicate cleanup.") if len(orphaned) > total_memories * 0.2: recommendations.append("Many orphaned memories. Review workflow organization.") if len(low_quality) > total_memories * 0.1: recommendations.append("Quality issues detected. Review content standards.") if len(stale_memories) > 50: recommendations.append("Archive old, unused memories to improve performance.") analysis_time = time.time() - start_time return QualityAnalysisResult( total_memories=total_memories, issues_found=issues_count, duplicates=len(duplicates), orphaned=len(orphaned), low_quality=len(low_quality), stale_memories=len(stale_memories), corrupted=len(corrupted), overall_score=overall_score, issues=issues, recommendations=recommendations, analysis_time=analysis_time ) finally: conn.close() async def preview_bulk_operation(self, request: BulkOperationRequest) -> BulkOperation: """Preview bulk operation changes before execution.""" operation_id = f"bulk_{int(time.time())}" conn = self.get_db_connection() try: cursor = conn.cursor() # Get affected memories placeholders = ','.join('?' * len(request.memory_ids)) cursor.execute(f"SELECT * FROM memories WHERE memory_id IN ({placeholders})", request.memory_ids) memories = [dict(row) for row in cursor.fetchall()] preview_changes = [] estimated_impact = {'memories_affected': len(memories)} if request.operation_type == "delete": for memory in memories: preview_changes.append({ 'action': 'delete', 'memory_id': memory['memory_id'], 'content_preview': memory['content'][:100] + "..." if len(memory['content']) > 100 else memory['content'], 'impact': 'Memory will be permanently deleted' }) estimated_impact['storage_freed'] = sum(len(m['content']) for m in memories) elif request.operation_type == "merge": if request.target_memory_id: target = next((m for m in memories if m['memory_id'] == request.target_memory_id), None) if target: others = [m for m in memories if m['memory_id'] != request.target_memory_id] preview_changes.append({ 'action': 'merge_target', 'memory_id': target['memory_id'], 'impact': f'Will be kept as primary memory, enhanced with content from {len(others)} others' }) for other in others: preview_changes.append({ 'action': 'merge_source', 'memory_id': other['memory_id'], 'impact': 'Content will be merged into target, then deleted' }) elif request.operation_type == "archive": for memory in memories: preview_changes.append({ 'action': 'archive', 'memory_id': memory['memory_id'], 'impact': 'Memory will be marked as archived (soft delete)' }) return BulkOperation( operation_id=operation_id, operation_type=request.operation_type, memory_ids=request.memory_ids, preview_changes=preview_changes, estimated_impact=estimated_impact, reversible=request.operation_type in ['archive'], confirmation_required=request.operation_type in ['delete', 'merge'] ) finally: conn.close() async def execute_bulk_operation(self, operation: BulkOperation) -> Dict: """Execute bulk operation with safety checks.""" conn = self.get_db_connection() try: cursor = conn.cursor() results = {'success': 0, 'failed': 0, 'errors': []} if operation.operation_type == "delete": for memory_id in operation.memory_ids: try: # Delete related links first cursor.execute("DELETE FROM memory_links WHERE source_memory_id = ? OR target_memory_id = ?", (memory_id, memory_id)) # Delete memory cursor.execute("DELETE FROM memories WHERE memory_id = ?", (memory_id,)) results['success'] += 1 except Exception as e: results['failed'] += 1 results['errors'].append(f"Failed to delete {memory_id}: {str(e)}") elif operation.operation_type == "archive": for memory_id in operation.memory_ids: try: cursor.execute("UPDATE memories SET archived = 1 WHERE memory_id = ?", (memory_id,)) results['success'] += 1 except Exception as e: results['failed'] += 1 results['errors'].append(f"Failed to archive {memory_id}: {str(e)}") conn.commit() return results except Exception as e: conn.rollback() raise HTTPException(status_code=500, detail=f"Bulk operation failed: {str(e)}") from e finally: conn.close() class WorkingMemoryManager: """Core working memory management and optimization logic.""" def __init__(self, db_path: str = "storage/unified_agent_memory.db"): self.db_path = db_path self.active_memories: Dict[str, WorkingMemoryItem] = {} self.capacity = 7 # Miller's rule: 7±2 self.focus_mode = "normal" self.retention_time = 30 # minutes self.connected_clients: List[WebSocket] = [] def get_db_connection(self): """Get database connection.""" conn = sqlite3.connect(self.db_path) conn.row_factory = sqlite3.Row return conn def calculate_memory_temperature(self, memory: Dict) -> float: """Calculate memory temperature based on access patterns.""" now = time.time() last_access = memory.get('last_accessed_at', memory.get('created_at', now)) access_count = memory.get('access_count', 0) # Recency component (decreases over time) time_since_access = now - last_access recency_score = max(0, 100 - (time_since_access / 3600) * 10) # Decreases over hours # Frequency component frequency_score = min(100, access_count * 10) # Weighted combination temperature = recency_score * 0.7 + frequency_score * 0.3 return round(temperature) def calculate_memory_priority(self, memory: Dict) -> str: """Calculate memory priority level.""" importance = memory.get('importance', 1) if importance >= 9: return 'critical' elif importance >= 7: return 'high' elif importance >= 5: return 'medium' else: return 'low' def calculate_access_frequency(self, memory: Dict) -> float: """Calculate normalized access frequency.""" access_count = memory.get('access_count', 0) return min(10, access_count / 5) # Normalized to 0-10 scale def calculate_retention_score(self, memory: Dict) -> float: """Calculate how likely memory should remain in working memory.""" importance = memory.get('importance', 1) confidence = memory.get('confidence', 0.5) access_count = memory.get('access_count', 0) score = (importance * 0.4 + confidence * 100 * 0.3 + min(access_count * 10, 100) * 0.3) / 10 return round(score, 2) def enhance_memory_for_working_memory(self, memory: Dict) -> WorkingMemoryItem: """Convert database memory to enhanced working memory item.""" return WorkingMemoryItem( memory_id=memory['memory_id'], content=memory['content'], memory_type=memory['memory_type'], memory_level=memory['memory_level'], importance=memory['importance'], confidence=memory.get('confidence', 0.5), created_at=memory['created_at'], last_accessed_at=memory.get('last_accessed_at'), access_count=memory.get('access_count', 0), workflow_id=memory.get('workflow_id'), temperature=self.calculate_memory_temperature(memory), priority=self.calculate_memory_priority(memory), access_frequency=self.calculate_access_frequency(memory), retention_score=self.calculate_retention_score(memory), added_at=time.time() ) def calculate_focus_score(self) -> float: """Calculate current focus score based on working memory coherence.""" if not self.active_memories: return 100.0 memories = list(self.active_memories.values()) # Calculate average importance avg_importance = sum(m.importance for m in memories) / len(memories) # Calculate diversity penalty type_variety = len(set(m.memory_type for m in memories)) level_variety = len(set(m.memory_level for m in memories)) # Lower variety = higher focus variety_penalty = (type_variety + level_variety) * 5 importance_bonus = avg_importance * 10 focus_score = max(0, min(100, importance_bonus - variety_penalty + 20)) return round(focus_score, 1) def calculate_efficiency(self) -> float: """Calculate working memory efficiency.""" if not self.active_memories: return 100.0 memories = list(self.active_memories.values()) # Average temperature (activity level) avg_temperature = sum(m.temperature for m in memories) / len(memories) # Utilization rate utilization = (len(memories) / self.capacity) * 100 # Optimal utilization is around 70% optimal_utilization = 100 - abs(utilization - 70) if abs(utilization - 70) < 30 else 70 efficiency = (avg_temperature * 0.6 + optimal_utilization * 0.4) return round(efficiency) def get_working_memory_stats(self) -> WorkingMemoryStats: """Get current working memory statistics.""" memories = list(self.active_memories.values()) return WorkingMemoryStats( active_count=len(memories), capacity=self.capacity, pressure=round((len(memories) / self.capacity) * 100), temperature=round(sum(m.temperature for m in memories) / len(memories)) if memories else 0, focus_score=self.calculate_focus_score(), efficiency=self.calculate_efficiency(), avg_retention_time=round(sum(m.retention_score for m in memories) / len(memories)) if memories else 0, total_accesses=sum(m.access_count for m in memories), last_updated=time.time() ) def generate_optimization_suggestions(self) -> List[OptimizationSuggestion]: """Generate optimization suggestions based on current state.""" suggestions = [] stats = self.get_working_memory_stats() memories = list(self.active_memories.values()) # High pressure suggestion if stats.pressure > 80: suggestions.append(OptimizationSuggestion( id="reduce-pressure", title="Reduce Memory Pressure", description="Working memory is near capacity. Consider removing lower priority items.", priority="high", impact="High", icon="alert-triangle", action="Auto-Remove", confidence=0.9, estimated_improvement={"pressure": -20, "efficiency": 15} )) # Cold memories suggestion cold_memories = [m for m in memories if m.temperature < 30] if cold_memories: suggestions.append(OptimizationSuggestion( id="remove-cold", title="Remove Stale Memories", description=f"{len(cold_memories)} memories haven't been accessed recently.", priority="medium", impact="Medium", icon="snowflake", action="Clear Stale", confidence=0.8, estimated_improvement={"temperature": 15, "efficiency": 10} )) # Low focus suggestion if stats.focus_score < 50: suggestions.append(OptimizationSuggestion( id="improve-focus", title="Improve Focus", description="Working memory contains diverse, unrelated items. Consider focusing on a single task.", priority="medium", impact="High", icon="target", action="Focus Mode", confidence=0.7, estimated_improvement={"focus_score": 30, "efficiency": 20} )) # Underutilization suggestion if stats.active_count < self.capacity / 2: suggestions.append(OptimizationSuggestion( id="add-related", title="Add Related Memories", description="Working memory has capacity for more relevant items.", priority="low", impact="Medium", icon="plus-circle", action="Add Related", confidence=0.6, estimated_improvement={"efficiency": 10, "focus_score": 5} )) return suggestions async def load_initial_working_memory(self) -> List[WorkingMemoryItem]: """Load initial working memory with high-importance memories.""" conn = self.get_db_connection() try: cursor = conn.cursor() # Get high-importance or working-level memories cursor.execute(""" SELECT * FROM memories WHERE memory_level = 'working' OR importance >= 8 ORDER BY created_at DESC, importance DESC LIMIT ? """, (self.capacity,)) memories = [] for row in cursor.fetchall(): memory_dict = dict(row) enhanced_memory = self.enhance_memory_for_working_memory(memory_dict) memories.append(enhanced_memory) self.active_memories[enhanced_memory.memory_id] = enhanced_memory return memories finally: conn.close() async def add_to_working_memory(self, memory_id: str) -> bool: """Add a memory to working memory.""" if len(self.active_memories) >= self.capacity: return False if memory_id in self.active_memories: return False conn = self.get_db_connection() try: cursor = conn.cursor() cursor.execute("SELECT * FROM memories WHERE memory_id = ?", (memory_id,)) row = cursor.fetchone() if not row: return False memory_dict = dict(row) enhanced_memory = self.enhance_memory_for_working_memory(memory_dict) self.active_memories[memory_id] = enhanced_memory # Broadcast update to connected clients await self.broadcast_update() return True finally: conn.close() async def remove_from_working_memory(self, memory_id: str) -> bool: """Remove a memory from working memory.""" if memory_id not in self.active_memories: return False del self.active_memories[memory_id] # Broadcast update to connected clients await self.broadcast_update() return True async def clear_working_memory(self): """Clear all working memory.""" self.active_memories.clear() await self.broadcast_update() async def apply_focus_mode(self, mode: str, retention_time: Optional[int] = None, max_memory: Optional[int] = None): """Apply focus mode settings.""" mode_settings = { 'deep': {'capacity': 5, 'retention': 60}, 'creative': {'capacity': 9, 'retention': 45}, 'analytical': {'capacity': 6, 'retention': 90}, 'maintenance': {'capacity': 3, 'retention': 20}, 'normal': {'capacity': 7, 'retention': 30} } settings = mode_settings.get(mode, mode_settings['normal']) self.focus_mode = mode self.capacity = max_memory or settings['capacity'] self.retention_time = retention_time or settings['retention'] # If we're over capacity, remove lowest priority memories if len(self.active_memories) > self.capacity: memories_by_priority = sorted( self.active_memories.values(), key=lambda m: (m.importance, m.retention_score), reverse=True ) # Keep only the top memories to_keep = memories_by_priority[:self.capacity] self.active_memories = {m.memory_id: m for m in to_keep} await self.broadcast_update() async def auto_optimize(self) -> List[str]: """Apply automatic optimizations.""" applied_optimizations = [] suggestions = self.generate_optimization_suggestions() for suggestion in suggestions: if suggestion.priority in ['medium', 'low'] and suggestion.confidence > 0.7: success = await self.apply_optimization(suggestion.id) if success: applied_optimizations.append(suggestion.title) return applied_optimizations async def apply_optimization(self, suggestion_id: str) -> bool: """Apply a specific optimization.""" memories = list(self.active_memories.values()) if suggestion_id == "reduce-pressure": # Remove lowest priority memories low_priority = [m for m in memories if m.priority == 'low'] for memory in low_priority[:2]: await self.remove_from_working_memory(memory.memory_id) return True elif suggestion_id == "remove-cold": # Remove cold memories cold_memories = [m for m in memories if m.temperature < 30] for memory in cold_memories[:3]: await self.remove_from_working_memory(memory.memory_id) return True elif suggestion_id == "improve-focus": # Switch to deep focus mode await self.apply_focus_mode('deep') return True elif suggestion_id == "add-related": # Add related memories await self.add_related_memories() return True return False async def add_related_memories(self): """Add memories related to current working memory.""" if not self.active_memories or len(self.active_memories) >= self.capacity: return current_types = set(m.memory_type for m in self.active_memories.values()) current_workflows = set(m.workflow_id for m in self.active_memories.values() if m.workflow_id) conn = self.get_db_connection() try: cursor = conn.cursor() # Find related memories placeholders = ','.join('?' * len(current_types)) if current_types else "''" workflow_placeholders = ','.join('?' * len(current_workflows)) if current_workflows else "''" query = f""" SELECT * FROM memories WHERE memory_id NOT IN ({','.join('?' * len(self.active_memories))}) AND (memory_type IN ({placeholders}) OR workflow_id IN ({workflow_placeholders})) AND importance >= 6 ORDER BY importance DESC LIMIT ? """ params = ( list(self.active_memories.keys()) + list(current_types) + list(current_workflows) + [self.capacity - len(self.active_memories)] ) cursor.execute(query, params) for row in cursor.fetchall(): memory_dict = dict(row) enhanced_memory = self.enhance_memory_for_working_memory(memory_dict) self.active_memories[enhanced_memory.memory_id] = enhanced_memory if len(self.active_memories) >= self.capacity: break finally: conn.close() await self.broadcast_update() def get_memory_pool(self, search: str = "", filter_type: str = "", limit: int = 50) -> List[Dict]: """Get available memory pool for working memory.""" conn = self.get_db_connection() try: cursor = conn.cursor() # Build query where_conditions = ["memory_id NOT IN ({})".format(','.join('?' * len(self.active_memories)))] params = list(self.active_memories.keys()) if search: where_conditions.append("(content LIKE ? OR memory_type LIKE ?)") params.extend([f"%{search}%", f"%{search}%"]) if filter_type == "high": where_conditions.append("importance >= 8") elif filter_type == "recent": day_ago = time.time() - 86400 where_conditions.append("created_at > ?") params.append(day_ago) elif filter_type == "related" and self.active_memories: current_types = set(m.memory_type for m in self.active_memories.values()) current_workflows = set(m.workflow_id for m in self.active_memories.values() if m.workflow_id) if current_types or current_workflows: type_placeholders = ','.join('?' * len(current_types)) if current_types else "''" workflow_placeholders = ','.join('?' * len(current_workflows)) if current_workflows else "''" where_conditions.append(f"(memory_type IN ({type_placeholders}) OR workflow_id IN ({workflow_placeholders}))") params.extend(list(current_types) + list(current_workflows)) query = f""" SELECT * FROM memories WHERE {' AND '.join(where_conditions)} ORDER BY importance DESC LIMIT ? """ params.append(limit) cursor.execute(query, params) memories = [] for row in cursor.fetchall(): memory_dict = dict(row) memory_dict['access_frequency'] = self.calculate_access_frequency(memory_dict) memories.append(memory_dict) return memories finally: conn.close() def generate_heatmap_data(self, timeframe: str = "24h") -> List[Dict]: """Generate memory activity heatmap data.""" now = time.time() intervals = [] # Configure timeframe timeframe_config = { '1h': {'seconds': 300, 'count': 12}, # 5 minute intervals '6h': {'seconds': 1800, 'count': 12}, # 30 minute intervals '24h': {'seconds': 3600, 'count': 24}, # 1 hour intervals '7d': {'seconds': 86400, 'count': 7} # 1 day intervals } config = timeframe_config.get(timeframe, timeframe_config['24h']) interval_seconds = config['seconds'] interval_count = config['count'] conn = self.get_db_connection() try: cursor = conn.cursor() for i in range(interval_count): interval_start = now - (interval_count - i) * interval_seconds interval_end = interval_start + interval_seconds # Count activities in this interval cursor.execute(""" SELECT COUNT(*) as activity_count FROM memories WHERE created_at >= ? AND created_at <= ? """, (interval_start, interval_end)) activity_count = cursor.fetchone()[0] intervals.append({ 'time': interval_start, 'activity': activity_count, 'intensity': min(1.0, activity_count / 10) # Normalize to 0-1 }) return intervals finally: conn.close() async def register_client(self, websocket: WebSocket): """Register a WebSocket client for real-time updates.""" self.connected_clients.append(websocket) async def unregister_client(self, websocket: WebSocket): """Unregister a WebSocket client.""" if websocket in self.connected_clients: self.connected_clients.remove(websocket) async def broadcast_update(self): """Broadcast working memory update to all connected clients.""" if not self.connected_clients: return update_data = { 'type': 'working_memory_update', 'stats': asdict(self.get_working_memory_stats()), 'active_memories': [asdict(m) for m in self.active_memories.values()], 'suggestions': [asdict(s) for s in self.generate_optimization_suggestions()], 'timestamp': time.time() } # Send to all connected clients disconnected_clients = [] for client in self.connected_clients: try: await client.send_text(json.dumps(update_data)) except Exception: disconnected_clients.append(client) # Remove disconnected clients for client in disconnected_clients: await self.unregister_client(client) # Global working memory manager instance working_memory_manager = WorkingMemoryManager() # Global memory quality inspector instance memory_quality_inspector = MemoryQualityInspector() def setup_working_memory_routes(app: FastAPI): """Setup working memory API routes.""" @app.get("/api/working-memory/status") async def get_working_memory_status(): """Get current working memory status and statistics.""" try: stats = working_memory_manager.get_working_memory_stats() active_memories = [asdict(m) for m in working_memory_manager.active_memories.values()] suggestions = [asdict(s) for s in working_memory_manager.generate_optimization_suggestions()] return { 'status': 'connected', 'stats': asdict(stats), 'active_memories': active_memories, 'suggestions': suggestions, 'focus_mode': working_memory_manager.focus_mode, 'capacity': working_memory_manager.capacity, 'retention_time': working_memory_manager.retention_time } except Exception as e: raise HTTPException(status_code=500, detail=str(e)) from e @app.post("/api/working-memory/initialize") async def initialize_working_memory(): """Initialize working memory with default high-importance memories.""" try: memories = await working_memory_manager.load_initial_working_memory() stats = working_memory_manager.get_working_memory_stats() return { 'success': True, 'message': f'Initialized with {len(memories)} memories', 'stats': asdict(stats), 'active_memories': [asdict(m) for m in memories] } except Exception as e: raise HTTPException(status_code=500, detail=str(e)) from e @app.post("/api/working-memory/add") async def add_memory_to_working_memory(request: WorkingMemoryRequest): """Add a memory to working memory.""" try: success = await working_memory_manager.add_to_working_memory(request.memory_id) if success: stats = working_memory_manager.get_working_memory_stats() return { 'success': True, 'message': 'Memory added to working memory', 'stats': asdict(stats) } else: return { 'success': False, 'message': 'Could not add memory (capacity reached or already exists)' } except Exception as e: raise HTTPException(status_code=500, detail=str(e)) from e @app.post("/api/working-memory/remove") async def remove_memory_from_working_memory(request: WorkingMemoryRequest): """Remove a memory from working memory.""" try: success = await working_memory_manager.remove_from_working_memory(request.memory_id) if success: stats = working_memory_manager.get_working_memory_stats() return { 'success': True, 'message': 'Memory removed from working memory', 'stats': asdict(stats) } else: return { 'success': False, 'message': 'Memory not found in working memory' } except Exception as e: raise HTTPException(status_code=500, detail=str(e)) from e @app.post("/api/working-memory/clear") async def clear_working_memory(): """Clear all working memory.""" try: await working_memory_manager.clear_working_memory() stats = working_memory_manager.get_working_memory_stats() return { 'success': True, 'message': 'Working memory cleared', 'stats': asdict(stats) } except Exception as e: raise HTTPException(status_code=500, detail=str(e)) from e @app.post("/api/working-memory/focus-mode") async def set_focus_mode(request: FocusModeRequest): """Set focus mode and apply related optimizations.""" try: await working_memory_manager.apply_focus_mode( request.mode, request.retention_time, request.max_working_memory ) stats = working_memory_manager.get_working_memory_stats() return { 'success': True, 'message': f'Applied {request.mode} focus mode', 'focus_mode': working_memory_manager.focus_mode, 'capacity': working_memory_manager.capacity, 'retention_time': working_memory_manager.retention_time, 'stats': asdict(stats) } except Exception as e: raise HTTPException(status_code=500, detail=str(e)) from e @app.post("/api/working-memory/optimize") async def optimize_working_memory(): """Apply automatic working memory optimizations.""" try: applied = await working_memory_manager.auto_optimize() stats = working_memory_manager.get_working_memory_stats() return { 'success': True, 'message': f'Applied {len(applied)} optimizations', 'optimizations_applied': applied, 'stats': asdict(stats) } except Exception as e: raise HTTPException(status_code=500, detail=str(e)) from e @app.post("/api/working-memory/apply-suggestion") async def apply_optimization_suggestion(request: OptimizationRequest): """Apply a specific optimization suggestion.""" try: success = await working_memory_manager.apply_optimization(request.suggestion_id) if success: stats = working_memory_manager.get_working_memory_stats() suggestions = [asdict(s) for s in working_memory_manager.generate_optimization_suggestions()] return { 'success': True, 'message': 'Optimization applied successfully', 'stats': asdict(stats), 'suggestions': suggestions } else: return { 'success': False, 'message': 'Could not apply optimization' } except Exception as e: raise HTTPException(status_code=500, detail=str(e)) from e @app.get("/api/working-memory/pool") async def get_memory_pool( search: str = "", filter_type: str = "", # "", "high", "recent", "related" limit: int = 50 ): """Get available memory pool for working memory.""" try: memories = working_memory_manager.get_memory_pool(search, filter_type, limit) return { 'success': True, 'memories': memories, 'count': len(memories) } except Exception as e: raise HTTPException(status_code=500, detail=str(e)) from e @app.get("/api/working-memory/heatmap") async def get_memory_heatmap(timeframe: str = "24h"): """Get memory activity heatmap data.""" try: heatmap_data = working_memory_manager.generate_heatmap_data(timeframe) return { 'success': True, 'timeframe': timeframe, 'data': heatmap_data } except Exception as e: raise HTTPException(status_code=500, detail=str(e)) from e @app.websocket("/ws/working-memory") async def working_memory_websocket(websocket: WebSocket): """WebSocket endpoint for real-time working memory updates.""" await websocket.accept() await working_memory_manager.register_client(websocket) try: # Send initial data initial_data = { 'type': 'initial_data', 'stats': asdict(working_memory_manager.get_working_memory_stats()), 'active_memories': [asdict(m) for m in working_memory_manager.active_memories.values()], 'suggestions': [asdict(s) for s in working_memory_manager.generate_optimization_suggestions()], 'focus_mode': working_memory_manager.focus_mode, 'capacity': working_memory_manager.capacity } await websocket.send_text(json.dumps(initial_data)) # Keep connection alive and handle messages while True: try: # Wait for messages from client data = await websocket.receive_text() message = json.loads(data) # Handle different message types if message.get('type') == 'ping': await websocket.send_text(json.dumps({'type': 'pong'})) except WebSocketDisconnect: break except Exception as e: print(f"WebSocket error: {e}") break finally: await working_memory_manager.unregister_client(websocket) # Memory Quality Inspector API Endpoints @app.post("/api/memory-quality/analyze") async def analyze_memory_quality(request: QualityAnalysisRequest): """Perform comprehensive memory quality analysis.""" try: result = await memory_quality_inspector.perform_quality_analysis(request) return { 'success': True, 'analysis': asdict(result) } except Exception as e: raise HTTPException(status_code=500, detail=f"Quality analysis failed: {str(e)}") from e @app.get("/api/memory-quality/quick-scan") async def quick_quality_scan(): """Perform quick quality scan with basic metrics.""" try: request = QualityAnalysisRequest( analysis_type="comprehensive", include_stale=False, include_low_importance=False, similarity_threshold=0.90, stale_threshold_days=7 ) result = await memory_quality_inspector.perform_quality_analysis(request) # Return simplified metrics for quick overview return { 'success': True, 'quick_metrics': { 'total_memories': result.total_memories, 'overall_score': result.overall_score, 'critical_issues': len([i for i in result.issues if i.severity == 'critical']), 'duplicates': result.duplicates, 'orphaned': result.orphaned, 'low_quality': result.low_quality, 'top_recommendations': result.recommendations[:3] } } except Exception as e: raise HTTPException(status_code=500, detail=f"Quick scan failed: {str(e)}") from e @app.post("/api/memory-quality/bulk-preview") async def preview_bulk_operation(request: BulkOperationRequest): """Preview bulk operation changes before execution.""" try: operation = await memory_quality_inspector.preview_bulk_operation(request) return { 'success': True, 'operation': asdict(operation) } except Exception as e: raise HTTPException(status_code=500, detail=f"Bulk preview failed: {str(e)}") from e @app.post("/api/memory-quality/bulk-execute") async def execute_bulk_operation(operation_request: BulkOperationRequest): """Execute bulk operation with safety checks.""" try: # First preview the operation operation = await memory_quality_inspector.preview_bulk_operation(operation_request) # Execute the operation results = await memory_quality_inspector.execute_bulk_operation(operation) return { 'success': True, 'operation_id': operation.operation_id, 'results': results, 'message': f"Bulk operation completed: {results['success']} successful, {results['failed']} failed" } except Exception as e: raise HTTPException(status_code=500, detail=f"Bulk operation failed: {str(e)}") from e @app.get("/api/memory-quality/duplicates") async def get_duplicates(): """Get all duplicate memory clusters.""" try: conn = memory_quality_inspector.get_db_connection() try: cursor = conn.cursor() cursor.execute("SELECT * FROM memories ORDER BY created_at DESC") memories = [dict(row) for row in cursor.fetchall()] finally: conn.close() clusters = memory_quality_inspector.detect_duplicates(memories, threshold=0.85) return { 'success': True, 'clusters': [asdict(cluster) for cluster in clusters], 'total_clusters': len(clusters), 'total_duplicates': sum(cluster.duplicate_count for cluster in clusters) } except Exception as e: raise HTTPException(status_code=500, detail=f"Duplicate detection failed: {str(e)}") from e @app.get("/api/memory-quality/orphaned") async def get_orphaned_memories(): """Get all orphaned memories.""" try: conn = memory_quality_inspector.get_db_connection() try: cursor = conn.cursor() cursor.execute("SELECT * FROM memories ORDER BY created_at DESC") memories = [dict(row) for row in cursor.fetchall()] finally: conn.close() orphaned = memory_quality_inspector.detect_orphaned_memories(memories) return { 'success': True, 'orphaned_memories': orphaned, 'total_orphaned': len(orphaned), 'completely_isolated': len([m for m in orphaned if m['isolation_level'] == 'complete']), 'partially_isolated': len([m for m in orphaned if m['isolation_level'] == 'partial']) } except Exception as e: raise HTTPException(status_code=500, detail=f"Orphaned memory detection failed: {str(e)}") from e @app.get("/api/memory-quality/stats") async def get_quality_stats(): """Get overall memory quality statistics.""" try: conn = memory_quality_inspector.get_db_connection() try: cursor = conn.cursor() # Basic stats cursor.execute("SELECT COUNT(*) as total FROM memories") total_memories = cursor.fetchone()['total'] cursor.execute("SELECT AVG(importance) as avg_importance, AVG(confidence) as avg_confidence FROM memories") quality_metrics = cursor.fetchone() cursor.execute("SELECT COUNT(*) as with_workflow FROM memories WHERE workflow_id IS NOT NULL") with_workflow = cursor.fetchone()['with_workflow'] cursor.execute("SELECT COUNT(*) as recent FROM memories WHERE created_at > ?", (time.time() - 86400 * 7,)) recent_memories = cursor.fetchone()['recent'] # Quality distribution cursor.execute(""" SELECT SUM(CASE WHEN importance >= 8 THEN 1 ELSE 0 END) as high_importance, SUM(CASE WHEN importance >= 5 THEN 1 ELSE 0 END) as medium_importance, SUM(CASE WHEN confidence >= 0.8 THEN 1 ELSE 0 END) as high_confidence, SUM(CASE WHEN confidence >= 0.5 THEN 1 ELSE 0 END) as medium_confidence FROM memories """) quality_dist = cursor.fetchone() finally: conn.close() return { 'success': True, 'stats': { 'total_memories': total_memories, 'avg_importance': round(quality_metrics['avg_importance'], 2), 'avg_confidence': round(quality_metrics['avg_confidence'], 2), 'workflow_coverage': round(with_workflow / max(1, total_memories) * 100, 1), 'recent_activity': recent_memories, 'quality_distribution': { 'high_importance': quality_dist['high_importance'], 'medium_importance': quality_dist['medium_importance'], 'high_confidence': quality_dist['high_confidence'], 'medium_confidence': quality_dist['medium_confidence'] } } } except Exception as e: raise HTTPException(status_code=500, detail=f"Stats collection failed: {str(e)}") from e # Background task to periodically update working memory async def working_memory_background_task(): """Background task for periodic working memory updates.""" while True: try: # Update temperatures and stats periodically for memory in working_memory_manager.active_memories.values(): # Recalculate temperature based on current time memory.temperature = working_memory_manager.calculate_memory_temperature(asdict(memory)) # Broadcast updates if there are connected clients if working_memory_manager.connected_clients: await working_memory_manager.broadcast_update() # Wait 30 seconds before next update await asyncio.sleep(30) except Exception as e: print(f"Background task error: {e}") await asyncio.sleep(60) # Wait longer if there's an error def start_background_tasks(app: FastAPI): """Start background tasks for working memory management.""" @app.on_event("startup") async def startup_event(): # Start background task asyncio.create_task(working_memory_background_task()) # Initialize working memory with default data try: await working_memory_manager.load_initial_working_memory() print("✅ Working memory initialized successfully") except Exception as e: print(f"⚠️ Could not initialize working memory: {e}")

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Kappasig920/Ultimate-MCP-Server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server