enhanced_server.py•22.9 kB
"""Enhanced Claude Memory MCP Server with Self-Learning Capabilities
Integrates:
1. Hooks-based automatic learning
2. Pattern extraction and preference learning
3. CLAUDE.md auto-generation
4. Agent performance tracking
5. Validation gates
6. Cost-optimized model routing via agentic-flow
All simple operations routed to cheap models (99.5% cost savings) while
maintaining premium quality for complex reasoning.
"""
import asyncio
import sqlite3
import json
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional
from mcp.server import Server
from mcp.types import Tool, TextContent
# Import existing components
from .autolog import get_autologger
from .export import export_to_markdown
# Import new self-learning components
from .schema_migration import SchemaMigration
from .model_router import get_model_router, get_agentic_flow_integration
# Import intelligence layer
from .hooks.pattern_extractor import PatternExtractor
from .intelligence.claudemd_manager import ClaudeMdManager
from .intelligence.temporal_graph import TemporalKnowledgeGraph
from .intelligence.validation_engine import ValidationEngine
from .intelligence.agent_tracker import AgentPerformanceTracker
class EnhancedClaudeMemoryMCP:
"""Enhanced Claude-native knowledge storage with self-learning"""
def __init__(self, db_path: Optional[str] = None):
self.server = Server("mcp-standards-enhanced")
# Use centralized location
if db_path is None:
self.db_path = Path.home() / ".mcp-standards" / "knowledge.db"
else:
self.db_path = Path(db_path)
self.db_path.parent.mkdir(parents=True, exist_ok=True)
# Run database migrations
migration = SchemaMigration(self.db_path)
if not migration.migrate():
print("Warning: Database migration failed")
# Initialize components
self.autologger = get_autologger(self.db_path)
self.pattern_extractor = PatternExtractor(self.db_path)
self.claudemd_manager = ClaudeMdManager(self.db_path)
self.temporal_graph = TemporalKnowledgeGraph(self.db_path)
self.validation_engine = ValidationEngine(self.db_path)
self.agent_tracker = AgentPerformanceTracker(self.db_path)
# Initialize cost optimization
self.model_router = get_model_router()
self.agentic_flow = get_agentic_flow_integration()
self._setup_handlers()
def _setup_handlers(self):
"""Setup MCP protocol handlers"""
@self.server.list_tools()
async def list_tools() -> List[Tool]:
"""List available tools"""
return [
# Original tools
Tool(
name="add_episode",
description="Add knowledge episode to memory (uses cost-efficient model)",
inputSchema={
"type": "object",
"properties": {
"name": {"type": "string", "description": "Episode name"},
"content": {"type": "string", "description": "Episode content"},
"source": {"type": "string", "description": "Source", "default": "user"},
},
"required": ["name", "content"],
},
),
Tool(
name="search_episodes",
description="Search knowledge episodes (uses cost-efficient model)",
inputSchema={
"type": "object",
"properties": {
"query": {"type": "string", "description": "Search query"},
"limit": {"type": "integer", "description": "Max results", "default": 10},
},
"required": ["query"],
},
),
Tool(
name="list_recent",
description="List recent episodes (uses cost-efficient model)",
inputSchema={
"type": "object",
"properties": {
"limit": {"type": "integer", "description": "Max results", "default": 10},
},
},
),
# New self-learning tools
Tool(
name="learn_preference",
description="Manually add a learned preference to memory",
inputSchema={
"type": "object",
"properties": {
"category": {"type": "string", "description": "Category (e.g., 'python-package')"},
"context": {"type": "string", "description": "When to apply this preference"},
"preference": {"type": "string", "description": "The preference/rule to apply"},
"project_specific": {"type": "boolean", "description": "Project-specific?", "default": False},
"project_path": {"type": "string", "description": "Project path (if project_specific)"},
},
"required": ["category", "context", "preference"],
},
),
Tool(
name="suggest_claudemd_update",
description="Get suggested CLAUDE.md updates based on learned patterns",
inputSchema={
"type": "object",
"properties": {
"project_path": {"type": "string", "description": "Project path (optional)"},
"min_confidence": {"type": "number", "description": "Min confidence", "default": 0.7},
},
},
),
Tool(
name="generate_claudemd",
description="Generate CLAUDE.md content from learned preferences",
inputSchema={
"type": "object",
"properties": {
"project_path": {"type": "string", "description": "Project path (optional)"},
"output_file": {"type": "string", "description": "File to write to (optional)"},
"min_confidence": {"type": "number", "description": "Min confidence", "default": 0.7},
},
},
),
Tool(
name="validate_spec",
description="Validate deliverable against specification",
inputSchema={
"type": "object",
"properties": {
"task_description": {"type": "string", "description": "Original task/spec"},
"deliverable_summary": {"type": "string", "description": "What was built"},
},
"required": ["task_description", "deliverable_summary"],
},
),
Tool(
name="check_quality_gates",
description="Check if quality gates are satisfied",
inputSchema={
"type": "object",
"properties": {
"project_path": {"type": "string", "description": "Project path"},
"gate_types": {"type": "array", "items": {"type": "string"}, "description": "Gate types to check"},
},
"required": ["project_path"],
},
),
Tool(
name="get_agent_suggestions",
description="Get agent recommendations based on task and performance history",
inputSchema={
"type": "object",
"properties": {
"task_category": {"type": "string", "description": "Type of task"},
"context": {"type": "string", "description": "Additional context"},
},
"required": ["task_category"],
},
),
Tool(
name="query_agent_performance",
description="Query agent performance statistics",
inputSchema={
"type": "object",
"properties": {
"agent_type": {"type": "string", "description": "Agent type (optional)"},
"task_category": {"type": "string", "description": "Task category (optional)"},
"time_range_days": {"type": "integer", "description": "Days to look back", "default": 30},
},
},
),
Tool(
name="log_agent_execution",
description="Log agent execution for performance tracking",
inputSchema={
"type": "object",
"properties": {
"agent_type": {"type": "string", "description": "Agent type"},
"task_category": {"type": "string", "description": "Task category"},
"success": {"type": "boolean", "description": "Success?"},
"correction_count": {"type": "integer", "description": "Corrections needed", "default": 0},
"execution_time": {"type": "number", "description": "Time in seconds"},
"project_path": {"type": "string", "description": "Project path"},
},
"required": ["agent_type", "task_category", "success"],
},
),
Tool(
name="get_cost_savings",
description="Get cost savings report from model routing",
inputSchema={
"type": "object",
"properties": {},
},
),
Tool(
name="export_to_markdown",
description="Export knowledge base to markdown files",
inputSchema={
"type": "object",
"properties": {
"export_path": {"type": "string", "description": "Export path (optional)"},
},
},
),
]
@self.server.call_tool()
async def call_tool(name: str, arguments: Dict[str, Any]) -> List[TextContent]:
"""Handle tool calls with intelligent model routing"""
# Route operation to appropriate model
routing = self.model_router.route_task(name)
# Original tools
if name == "add_episode":
result = await self._add_episode(
arguments["name"],
arguments["content"],
arguments.get("source", "user"),
)
result["routing"] = routing # Include routing info
return [TextContent(type="text", text=json.dumps(result))]
elif name == "search_episodes":
results = await self._search_episodes(
arguments["query"],
arguments.get("limit", 10)
)
results["routing"] = routing
return [TextContent(type="text", text=json.dumps(results))]
elif name == "list_recent":
results = await self._list_recent(arguments.get("limit", 10))
results["routing"] = routing
return [TextContent(type="text", text=json.dumps(results))]
# New self-learning tools
elif name == "learn_preference":
result = await self._learn_preference(
arguments["category"],
arguments["context"],
arguments["preference"],
arguments.get("project_specific", False),
arguments.get("project_path", "")
)
return [TextContent(type="text", text=json.dumps(result))]
elif name == "suggest_claudemd_update":
suggestions = self.claudemd_manager.suggest_updates(
project_path=arguments.get("project_path"),
min_confidence=arguments.get("min_confidence", 0.7)
)
return [TextContent(type="text", text=json.dumps({
"suggestions": suggestions,
"count": len(suggestions)
}))]
elif name == "generate_claudemd":
content = self.claudemd_manager.generate_claudemd_content(
project_path=arguments.get("project_path"),
min_confidence=arguments.get("min_confidence", 0.7)
)
# Optionally write to file
output_file = arguments.get("output_file")
if output_file:
file_path = Path(output_file)
success, message = self.claudemd_manager.update_claudemd_file(
file_path,
arguments.get("project_path"),
arguments.get("min_confidence", 0.7)
)
return [TextContent(type="text", text=json.dumps({
"success": success,
"message": message,
"content": content
}))]
return [TextContent(type="text", text=json.dumps({
"content": content
}))]
elif name == "validate_spec":
# Capture spec first
spec_id = self.validation_engine.capture_spec(
arguments["task_description"],
""
)
# Validate
result = self.validation_engine.validate_spec(
spec_id,
arguments["deliverable_summary"]
)
return [TextContent(type="text", text=json.dumps(result))]
elif name == "check_quality_gates":
result = self.validation_engine.check_quality_gates(
arguments["project_path"],
arguments.get("gate_types")
)
return [TextContent(type="text", text=json.dumps(result))]
elif name == "get_agent_suggestions":
suggestions = self.agent_tracker.recommend_agent(
arguments["task_category"],
arguments.get("context")
)
return [TextContent(type="text", text=json.dumps({
"suggestions": suggestions,
"task_category": arguments["task_category"]
}))]
elif name == "query_agent_performance":
stats = self.agent_tracker.get_agent_stats(
agent_type=arguments.get("agent_type"),
task_category=arguments.get("task_category"),
time_range_days=arguments.get("time_range_days", 30)
)
return [TextContent(type="text", text=json.dumps(stats))]
elif name == "log_agent_execution":
log_id = self.agent_tracker.log_agent_execution(
arguments["agent_type"],
arguments["task_category"],
arguments["success"],
arguments.get("correction_count", 0),
arguments.get("execution_time"),
None,
arguments.get("project_path", "")
)
return [TextContent(type="text", text=json.dumps({
"success": True,
"log_id": log_id
}))]
elif name == "get_cost_savings":
report = self.agentic_flow.get_cost_report()
return [TextContent(type="text", text=json.dumps(report))]
elif name == "export_to_markdown":
result = await self._export_to_markdown(
arguments.get("export_path")
)
return [TextContent(type="text", text=json.dumps(result))]
else:
return [TextContent(type="text", text=f"Unknown tool: {name}")]
# Original methods (unchanged)
async def _add_episode(self, name: str, content: str, source: str = "user") -> Dict[str, Any]:
"""Add episode to memory"""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.execute(
"INSERT INTO episodes (name, content, source) VALUES (?, ?, ?)",
(name, content, source)
)
episode_id = cursor.lastrowid
# Update search index
conn.execute(
"INSERT INTO episodes_search (rowid, name, content, source) VALUES (?, ?, ?, ?)",
(episode_id, name, content, source)
)
conn.commit()
return {
"success": True,
"id": episode_id,
"message": f"Episode '{name}' added successfully"
}
except Exception as e:
return {"success": False, "error": str(e)}
async def _search_episodes(self, query: str, limit: int = 10) -> Dict[str, Any]:
"""Search episodes using FTS"""
try:
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row
try:
cursor = conn.execute("""
SELECT e.id, e.name, e.content, e.source, e.timestamp,
rank
FROM episodes_search
JOIN episodes e ON episodes_search.rowid = e.id
WHERE episodes_search MATCH ?
ORDER BY rank
LIMIT ?
""", (query, limit))
except sqlite3.OperationalError:
cursor = conn.execute("""
SELECT id, name, content, source, timestamp
FROM episodes
WHERE name LIKE ? OR content LIKE ?
ORDER BY timestamp DESC
LIMIT ?
""", (f"%{query}%", f"%{query}%", limit))
episodes = [dict(row) for row in cursor.fetchall()]
return {
"success": True,
"query": query,
"results": episodes,
"count": len(episodes)
}
except Exception as e:
return {"success": False, "error": str(e)}
async def _list_recent(self, limit: int = 10) -> Dict[str, Any]:
"""List recent episodes"""
try:
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row
cursor = conn.execute("""
SELECT id, name, content, source, timestamp
FROM episodes
ORDER BY timestamp DESC
LIMIT ?
""", (limit,))
episodes = [dict(row) for row in cursor.fetchall()]
return {
"success": True,
"results": episodes,
"count": len(episodes)
}
except Exception as e:
return {"success": False, "error": str(e)}
async def _learn_preference(
self,
category: str,
context: str,
preference: str,
project_specific: bool = False,
project_path: str = ""
) -> Dict[str, Any]:
"""Manually add a learned preference"""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.execute("""
INSERT INTO tool_preferences (
category, context, preference, confidence,
project_specific, project_path, created_at
) VALUES (?, ?, ?, 1.0, ?, ?, ?)
""", (
category, context, preference,
project_specific, project_path,
datetime.now().isoformat()
))
pref_id = cursor.lastrowid
# Record in temporal graph
self.temporal_graph.record_preference_change(
pref_id,
"created",
reason="manually added"
)
conn.commit()
return {
"success": True,
"preference_id": pref_id,
"message": f"Preference added: {preference}"
}
except Exception as e:
return {"success": False, "error": str(e)}
async def _export_to_markdown(self, export_path: Optional[str] = None) -> Dict[str, Any]:
"""Export knowledge base to markdown"""
try:
if export_path:
path = Path(export_path)
else:
path = None
export_location = export_to_markdown(self.db_path, path)
return {
"success": True,
"exported_to": str(export_location),
"message": f"Knowledge base exported to {export_location}"
}
except Exception as e:
return {"success": False, "error": str(e)}
async def main():
"""Run the enhanced MCP server"""
memory_server = EnhancedClaudeMemoryMCP()
print("✓ Enhanced Claude Memory MCP Server initialized")
print("✓ Self-learning capabilities enabled")
print("✓ Cost optimization via agentic-flow ready")
# Run MCP server
from mcp.server.stdio import stdio_server
async with stdio_server() as (read_stream, write_stream):
await memory_server.server.run(
read_stream,
write_stream,
memory_server.server.create_initialization_options(),
)
if __name__ == "__main__":
asyncio.run(main())