Skip to main content
Glama

MCP Standards

by airmcp-com
enhanced_server.py22.9 kB
"""Enhanced Claude Memory MCP Server with Self-Learning Capabilities Integrates: 1. Hooks-based automatic learning 2. Pattern extraction and preference learning 3. CLAUDE.md auto-generation 4. Agent performance tracking 5. Validation gates 6. Cost-optimized model routing via agentic-flow All simple operations routed to cheap models (99.5% cost savings) while maintaining premium quality for complex reasoning. """ import asyncio import sqlite3 import json from datetime import datetime from pathlib import Path from typing import Any, Dict, List, Optional from mcp.server import Server from mcp.types import Tool, TextContent # Import existing components from .autolog import get_autologger from .export import export_to_markdown # Import new self-learning components from .schema_migration import SchemaMigration from .model_router import get_model_router, get_agentic_flow_integration # Import intelligence layer from .hooks.pattern_extractor import PatternExtractor from .intelligence.claudemd_manager import ClaudeMdManager from .intelligence.temporal_graph import TemporalKnowledgeGraph from .intelligence.validation_engine import ValidationEngine from .intelligence.agent_tracker import AgentPerformanceTracker class EnhancedClaudeMemoryMCP: """Enhanced Claude-native knowledge storage with self-learning""" def __init__(self, db_path: Optional[str] = None): self.server = Server("mcp-standards-enhanced") # Use centralized location if db_path is None: self.db_path = Path.home() / ".mcp-standards" / "knowledge.db" else: self.db_path = Path(db_path) self.db_path.parent.mkdir(parents=True, exist_ok=True) # Run database migrations migration = SchemaMigration(self.db_path) if not migration.migrate(): print("Warning: Database migration failed") # Initialize components self.autologger = get_autologger(self.db_path) self.pattern_extractor = PatternExtractor(self.db_path) self.claudemd_manager = ClaudeMdManager(self.db_path) self.temporal_graph = TemporalKnowledgeGraph(self.db_path) self.validation_engine = ValidationEngine(self.db_path) self.agent_tracker = AgentPerformanceTracker(self.db_path) # Initialize cost optimization self.model_router = get_model_router() self.agentic_flow = get_agentic_flow_integration() self._setup_handlers() def _setup_handlers(self): """Setup MCP protocol handlers""" @self.server.list_tools() async def list_tools() -> List[Tool]: """List available tools""" return [ # Original tools Tool( name="add_episode", description="Add knowledge episode to memory (uses cost-efficient model)", inputSchema={ "type": "object", "properties": { "name": {"type": "string", "description": "Episode name"}, "content": {"type": "string", "description": "Episode content"}, "source": {"type": "string", "description": "Source", "default": "user"}, }, "required": ["name", "content"], }, ), Tool( name="search_episodes", description="Search knowledge episodes (uses cost-efficient model)", inputSchema={ "type": "object", "properties": { "query": {"type": "string", "description": "Search query"}, "limit": {"type": "integer", "description": "Max results", "default": 10}, }, "required": ["query"], }, ), Tool( name="list_recent", description="List recent episodes (uses cost-efficient model)", inputSchema={ "type": "object", "properties": { "limit": {"type": "integer", "description": "Max results", "default": 10}, }, }, ), # New self-learning tools Tool( name="learn_preference", description="Manually add a learned preference to memory", inputSchema={ "type": "object", "properties": { "category": {"type": "string", "description": "Category (e.g., 'python-package')"}, "context": {"type": "string", "description": "When to apply this preference"}, "preference": {"type": "string", "description": "The preference/rule to apply"}, "project_specific": {"type": "boolean", "description": "Project-specific?", "default": False}, "project_path": {"type": "string", "description": "Project path (if project_specific)"}, }, "required": ["category", "context", "preference"], }, ), Tool( name="suggest_claudemd_update", description="Get suggested CLAUDE.md updates based on learned patterns", inputSchema={ "type": "object", "properties": { "project_path": {"type": "string", "description": "Project path (optional)"}, "min_confidence": {"type": "number", "description": "Min confidence", "default": 0.7}, }, }, ), Tool( name="generate_claudemd", description="Generate CLAUDE.md content from learned preferences", inputSchema={ "type": "object", "properties": { "project_path": {"type": "string", "description": "Project path (optional)"}, "output_file": {"type": "string", "description": "File to write to (optional)"}, "min_confidence": {"type": "number", "description": "Min confidence", "default": 0.7}, }, }, ), Tool( name="validate_spec", description="Validate deliverable against specification", inputSchema={ "type": "object", "properties": { "task_description": {"type": "string", "description": "Original task/spec"}, "deliverable_summary": {"type": "string", "description": "What was built"}, }, "required": ["task_description", "deliverable_summary"], }, ), Tool( name="check_quality_gates", description="Check if quality gates are satisfied", inputSchema={ "type": "object", "properties": { "project_path": {"type": "string", "description": "Project path"}, "gate_types": {"type": "array", "items": {"type": "string"}, "description": "Gate types to check"}, }, "required": ["project_path"], }, ), Tool( name="get_agent_suggestions", description="Get agent recommendations based on task and performance history", inputSchema={ "type": "object", "properties": { "task_category": {"type": "string", "description": "Type of task"}, "context": {"type": "string", "description": "Additional context"}, }, "required": ["task_category"], }, ), Tool( name="query_agent_performance", description="Query agent performance statistics", inputSchema={ "type": "object", "properties": { "agent_type": {"type": "string", "description": "Agent type (optional)"}, "task_category": {"type": "string", "description": "Task category (optional)"}, "time_range_days": {"type": "integer", "description": "Days to look back", "default": 30}, }, }, ), Tool( name="log_agent_execution", description="Log agent execution for performance tracking", inputSchema={ "type": "object", "properties": { "agent_type": {"type": "string", "description": "Agent type"}, "task_category": {"type": "string", "description": "Task category"}, "success": {"type": "boolean", "description": "Success?"}, "correction_count": {"type": "integer", "description": "Corrections needed", "default": 0}, "execution_time": {"type": "number", "description": "Time in seconds"}, "project_path": {"type": "string", "description": "Project path"}, }, "required": ["agent_type", "task_category", "success"], }, ), Tool( name="get_cost_savings", description="Get cost savings report from model routing", inputSchema={ "type": "object", "properties": {}, }, ), Tool( name="export_to_markdown", description="Export knowledge base to markdown files", inputSchema={ "type": "object", "properties": { "export_path": {"type": "string", "description": "Export path (optional)"}, }, }, ), ] @self.server.call_tool() async def call_tool(name: str, arguments: Dict[str, Any]) -> List[TextContent]: """Handle tool calls with intelligent model routing""" # Route operation to appropriate model routing = self.model_router.route_task(name) # Original tools if name == "add_episode": result = await self._add_episode( arguments["name"], arguments["content"], arguments.get("source", "user"), ) result["routing"] = routing # Include routing info return [TextContent(type="text", text=json.dumps(result))] elif name == "search_episodes": results = await self._search_episodes( arguments["query"], arguments.get("limit", 10) ) results["routing"] = routing return [TextContent(type="text", text=json.dumps(results))] elif name == "list_recent": results = await self._list_recent(arguments.get("limit", 10)) results["routing"] = routing return [TextContent(type="text", text=json.dumps(results))] # New self-learning tools elif name == "learn_preference": result = await self._learn_preference( arguments["category"], arguments["context"], arguments["preference"], arguments.get("project_specific", False), arguments.get("project_path", "") ) return [TextContent(type="text", text=json.dumps(result))] elif name == "suggest_claudemd_update": suggestions = self.claudemd_manager.suggest_updates( project_path=arguments.get("project_path"), min_confidence=arguments.get("min_confidence", 0.7) ) return [TextContent(type="text", text=json.dumps({ "suggestions": suggestions, "count": len(suggestions) }))] elif name == "generate_claudemd": content = self.claudemd_manager.generate_claudemd_content( project_path=arguments.get("project_path"), min_confidence=arguments.get("min_confidence", 0.7) ) # Optionally write to file output_file = arguments.get("output_file") if output_file: file_path = Path(output_file) success, message = self.claudemd_manager.update_claudemd_file( file_path, arguments.get("project_path"), arguments.get("min_confidence", 0.7) ) return [TextContent(type="text", text=json.dumps({ "success": success, "message": message, "content": content }))] return [TextContent(type="text", text=json.dumps({ "content": content }))] elif name == "validate_spec": # Capture spec first spec_id = self.validation_engine.capture_spec( arguments["task_description"], "" ) # Validate result = self.validation_engine.validate_spec( spec_id, arguments["deliverable_summary"] ) return [TextContent(type="text", text=json.dumps(result))] elif name == "check_quality_gates": result = self.validation_engine.check_quality_gates( arguments["project_path"], arguments.get("gate_types") ) return [TextContent(type="text", text=json.dumps(result))] elif name == "get_agent_suggestions": suggestions = self.agent_tracker.recommend_agent( arguments["task_category"], arguments.get("context") ) return [TextContent(type="text", text=json.dumps({ "suggestions": suggestions, "task_category": arguments["task_category"] }))] elif name == "query_agent_performance": stats = self.agent_tracker.get_agent_stats( agent_type=arguments.get("agent_type"), task_category=arguments.get("task_category"), time_range_days=arguments.get("time_range_days", 30) ) return [TextContent(type="text", text=json.dumps(stats))] elif name == "log_agent_execution": log_id = self.agent_tracker.log_agent_execution( arguments["agent_type"], arguments["task_category"], arguments["success"], arguments.get("correction_count", 0), arguments.get("execution_time"), None, arguments.get("project_path", "") ) return [TextContent(type="text", text=json.dumps({ "success": True, "log_id": log_id }))] elif name == "get_cost_savings": report = self.agentic_flow.get_cost_report() return [TextContent(type="text", text=json.dumps(report))] elif name == "export_to_markdown": result = await self._export_to_markdown( arguments.get("export_path") ) return [TextContent(type="text", text=json.dumps(result))] else: return [TextContent(type="text", text=f"Unknown tool: {name}")] # Original methods (unchanged) async def _add_episode(self, name: str, content: str, source: str = "user") -> Dict[str, Any]: """Add episode to memory""" try: with sqlite3.connect(self.db_path) as conn: cursor = conn.execute( "INSERT INTO episodes (name, content, source) VALUES (?, ?, ?)", (name, content, source) ) episode_id = cursor.lastrowid # Update search index conn.execute( "INSERT INTO episodes_search (rowid, name, content, source) VALUES (?, ?, ?, ?)", (episode_id, name, content, source) ) conn.commit() return { "success": True, "id": episode_id, "message": f"Episode '{name}' added successfully" } except Exception as e: return {"success": False, "error": str(e)} async def _search_episodes(self, query: str, limit: int = 10) -> Dict[str, Any]: """Search episodes using FTS""" try: with sqlite3.connect(self.db_path) as conn: conn.row_factory = sqlite3.Row try: cursor = conn.execute(""" SELECT e.id, e.name, e.content, e.source, e.timestamp, rank FROM episodes_search JOIN episodes e ON episodes_search.rowid = e.id WHERE episodes_search MATCH ? ORDER BY rank LIMIT ? """, (query, limit)) except sqlite3.OperationalError: cursor = conn.execute(""" SELECT id, name, content, source, timestamp FROM episodes WHERE name LIKE ? OR content LIKE ? ORDER BY timestamp DESC LIMIT ? """, (f"%{query}%", f"%{query}%", limit)) episodes = [dict(row) for row in cursor.fetchall()] return { "success": True, "query": query, "results": episodes, "count": len(episodes) } except Exception as e: return {"success": False, "error": str(e)} async def _list_recent(self, limit: int = 10) -> Dict[str, Any]: """List recent episodes""" try: with sqlite3.connect(self.db_path) as conn: conn.row_factory = sqlite3.Row cursor = conn.execute(""" SELECT id, name, content, source, timestamp FROM episodes ORDER BY timestamp DESC LIMIT ? """, (limit,)) episodes = [dict(row) for row in cursor.fetchall()] return { "success": True, "results": episodes, "count": len(episodes) } except Exception as e: return {"success": False, "error": str(e)} async def _learn_preference( self, category: str, context: str, preference: str, project_specific: bool = False, project_path: str = "" ) -> Dict[str, Any]: """Manually add a learned preference""" try: with sqlite3.connect(self.db_path) as conn: cursor = conn.execute(""" INSERT INTO tool_preferences ( category, context, preference, confidence, project_specific, project_path, created_at ) VALUES (?, ?, ?, 1.0, ?, ?, ?) """, ( category, context, preference, project_specific, project_path, datetime.now().isoformat() )) pref_id = cursor.lastrowid # Record in temporal graph self.temporal_graph.record_preference_change( pref_id, "created", reason="manually added" ) conn.commit() return { "success": True, "preference_id": pref_id, "message": f"Preference added: {preference}" } except Exception as e: return {"success": False, "error": str(e)} async def _export_to_markdown(self, export_path: Optional[str] = None) -> Dict[str, Any]: """Export knowledge base to markdown""" try: if export_path: path = Path(export_path) else: path = None export_location = export_to_markdown(self.db_path, path) return { "success": True, "exported_to": str(export_location), "message": f"Knowledge base exported to {export_location}" } except Exception as e: return {"success": False, "error": str(e)} async def main(): """Run the enhanced MCP server""" memory_server = EnhancedClaudeMemoryMCP() print("✓ Enhanced Claude Memory MCP Server initialized") print("✓ Self-learning capabilities enabled") print("✓ Cost optimization via agentic-flow ready") # Run MCP server from mcp.server.stdio import stdio_server async with stdio_server() as (read_stream, write_stream): await memory_server.server.run( read_stream, write_stream, memory_server.server.create_initialization_options(), ) if __name__ == "__main__": asyncio.run(main())

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/airmcp-com/mcp-standards'

If you have feedback or need assistance with the MCP directory API, please join our Discord server