capability_registry.py•24.1 kB
"""
Dynamic capability metadata registry for intelligent routing.
"""
import logging
from dataclasses import dataclass
from typing import Dict, List, Any, Optional
from enum import Enum
# Import heuristics from ACP system
from ..acp.heuristics import HeuristicTags, RiskLevel, Complexity, Scope, Persistence, Testing, Approval
logger = logging.getLogger(__name__)
class CapabilityType(Enum):
"""Types of capabilities."""
WEB = "web"
ACP = "acp"
SYSTEM = "system"
WORKFLOW = "workflow"
class CapabilityScope(Enum):
"""Scope of capabilities."""
SINGLE = "single" # Single operation
MULTI = "multi" # Multi-step workflow
ANALYSIS = "analysis" # Analysis/monitoring
CREATIVE = "creative" # Generation/creation
@dataclass
class CapabilityMetadata:
"""Metadata for a capability with integrated heuristics."""
name: str
description: str
capability_type: CapabilityType
scope: CapabilityScope
input_schema: Dict[str, Any]
examples: List[str]
tags: List[str]
dependencies: Optional[List[str]] = None
performance_tier: str = "standard" # fast/standard/slow
complexity: str = "medium" # low/medium/high
# NEW: Heuristic integration
heuristic_tags: Optional[HeuristicTags] = None
risk_level: str = "medium" # low/medium/high/critical
approval_required: bool = False
testing_level: str = "basic" # none/basic/comprehensive/parallel
persistence_level: str = "session" # ephemeral/session/persistent/permanent
def __post_init__(self):
if self.dependencies is None:
self.dependencies = []
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for LLM context."""
result = {
"name": self.name,
"description": self.description,
"capability_type": self.capability_type.value,
"capability_scope": self.scope.value,
"input_schema": self.input_schema,
"examples": self.examples,
"tags": self.tags,
"dependencies": self.dependencies or [],
"performance_tier": self.performance_tier,
"complexity": self.complexity,
# NEW: Heuristic information
"risk_level": self.risk_level,
"approval_required": self.approval_required,
"testing_level": self.testing_level,
"persistence_level": self.persistence_level
}
# Add heuristic tags if available
if self.heuristic_tags:
result["heuristic_tags"] = self.heuristic_tags.to_dict()
return result
class CapabilityRegistry:
"""Registry for dynamic capability metadata with persistence."""
def __init__(self, persistence=None):
self.capabilities: Dict[str, CapabilityMetadata] = {}
self.persistence = persistence
# Note: We'll load asynchronously after initialization
self._capabilities_loaded = False
if not self.capabilities:
self._initialize_default_capabilities()
# Initialize heuristics for all default capabilities
self._initialize_heuristics_for_all_capabilities()
def _initialize_heuristics_for_all_capabilities(self):
"""Initialize heuristics for all existing capabilities."""
for name, capability in self.capabilities.items():
if capability.heuristic_tags is None:
capability.heuristic_tags = self._evaluate_capability_heuristics(capability)
self._update_heuristic_derived_fields(capability)
async def load_capabilities(self):
"""Load capabilities from persistence."""
if self.persistence and not self._capabilities_loaded:
try:
loaded = await self.persistence.load_capabilities()
if loaded:
# Convert dict back to CapabilityMetadata objects
for name, cap_dict in loaded.items():
# Handle heuristic tags reconstruction
heuristic_tags = None
if 'heuristic_tags' in cap_dict:
from ..acp.heuristics import HeuristicTags, RiskLevel, Complexity, Scope, Persistence, Testing, Approval
ht = cap_dict['heuristic_tags']
heuristic_tags = HeuristicTags(
risk=RiskLevel(ht['risk']),
complexity=Complexity(ht['complexity']),
scope=Scope(ht['scope']),
persistence=Persistence(ht['persistence']),
testing=Testing(ht['testing']),
approval=Approval(ht['approval'])
)
# Create CapabilityMetadata with required fields
self.capabilities[name] = CapabilityMetadata(
name=cap_dict['name'],
description=cap_dict['description'],
capability_type=CapabilityType(cap_dict['capability_type']),
scope=CapabilityScope(cap_dict['capability_scope']),
input_schema=cap_dict['input_schema'],
examples=cap_dict['examples'],
tags=cap_dict['tags'],
dependencies=cap_dict.get('dependencies'),
performance_tier=cap_dict.get('performance_tier', 'standard'),
complexity=cap_dict.get('complexity', 'medium'),
# Heuristic fields
heuristic_tags=heuristic_tags,
risk_level=cap_dict.get('risk_level', 'medium'),
approval_required=cap_dict.get('approval_required', False),
testing_level=cap_dict.get('testing_level', 'basic'),
persistence_level=cap_dict.get('persistence_level', 'session')
)
self._capabilities_loaded = True
except Exception as e:
logger.warning(f"Failed to load capabilities from persistence: {e}")
async def save_capabilities(self):
"""Load capabilities from persistence."""
if self.persistence:
try:
loaded = await self.persistence.load_capabilities()
if loaded:
# Convert dict back to CapabilityMetadata objects
for name, cap_dict in loaded.items():
self.capabilities[name] = CapabilityMetadata(**cap_dict)
except Exception as e:
logger.warning(f"Failed to load capabilities from persistence: {e}")
async def _save_capabilities(self):
"""Save capabilities to persistence."""
if self.persistence:
try:
# Convert to dict for serialization
cap_dict = {name: cap.to_dict() for name, cap in self.capabilities.items()}
await self.persistence.save_capabilities(cap_dict)
except Exception as e:
logger.warning(f"Failed to save capabilities to persistence: {e}")
def _initialize_default_capabilities(self):
"""Initialize default capabilities based on current system."""
# Web Capabilities
self._register_sync(CapabilityMetadata(
name="web_search",
description="Search for information using multiple search engines",
capability_type=CapabilityType.WEB,
scope=CapabilityScope.SINGLE,
input_schema={
"type": "object",
"properties": {
"query": {"type": "string"},
"max_results": {"type": "integer", "default": 5}
},
"required": ["query"]
},
examples=[
"Search for machine learning tutorials",
"Find information about Python frameworks",
"Look up recent news about AI"
],
tags=["search", "research", "information", "web"],
performance_tier="standard",
complexity="low",
# Heuristics will be auto-evaluated
heuristic_tags=None
))
self._register_sync(CapabilityMetadata(
name="web_scrape",
description="Extract and convert content from web pages to structured format",
capability_type=CapabilityType.WEB,
scope=CapabilityScope.SINGLE,
input_schema={
"type": "object",
"properties": {
"url": {"type": "string"},
"format": {"type": "string", "enum": ["text", "markdown"], "default": "markdown"}
},
"required": ["url"]
},
examples=[
"Extract content from Wikipedia article",
"Convert web page to markdown format",
"Get text content from documentation site"
],
tags=["scraping", "extraction", "content", "web"],
performance_tier="standard",
complexity="medium",
heuristic_tags=None
))
# ACP System Capabilities
self._register_sync(CapabilityMetadata(
name="acp_inspect",
description="Inspect system capabilities, health status, and configuration",
capability_type=CapabilityType.ACP,
scope=CapabilityScope.ANALYSIS,
input_schema={"type": "object", "properties": {}, "required": []},
examples=[
"Check system health and status",
"List all available capabilities",
"Get system configuration overview"
],
tags=["system", "health", "inspection", "status"],
performance_tier="fast",
complexity="low",
heuristic_tags=None
))
self._register_sync(CapabilityMetadata(
name="acp_propose",
description="Propose new capabilities based on user needs and system analysis",
capability_type=CapabilityType.ACP,
scope=CapabilityScope.CREATIVE,
input_schema={
"type": "object",
"properties": {
"need": {"type": "string"},
"context": {"type": "object"}
},
"required": ["need"]
},
examples=[
"Propose a capability for data visualization",
"Suggest automation for repetitive tasks",
"Design solution for workflow optimization"
],
tags=["proposal", "development", "innovation", "design"],
performance_tier="slow",
complexity="high"
))
self._register_sync(CapabilityMetadata(
name="acp_compose",
description="Compose multiple capabilities into automated workflows",
capability_type=CapabilityType.ACP,
scope=CapabilityScope.MULTI,
input_schema={
"type": "object",
"properties": {
"capabilities": {"type": "array", "items": {"type": "string"}},
"workflow_name": {"type": "string"}
},
"required": ["capabilities", "workflow_name"]
},
examples=[
"Create workflow for automated testing",
"Compose data analysis pipeline",
"Build automated report generation"
],
tags=["workflow", "composition", "automation", "orchestration"],
performance_tier="standard",
complexity="high"
))
self._register_sync(CapabilityMetadata(
name="acp_heal",
description="Diagnose and heal system issues automatically",
capability_type=CapabilityType.ACP,
scope=CapabilityScope.ANALYSIS,
input_schema={
"type": "object",
"properties": {
"issue": {"type": "object"}
},
"required": ["issue"]
},
examples=[
"Heal system performance issues",
"Recover from capability failures",
"Fix configuration problems"
],
tags=["healing", "recovery", "diagnostics", "maintenance"],
performance_tier="standard",
complexity="medium",
heuristic_tags=None
))
# Feedback and Learning Capabilities
self._register_sync(CapabilityMetadata(
name="acp_feedback_submit",
description="Submit feedback for capability execution and performance",
capability_type=CapabilityType.ACP,
scope=CapabilityScope.SINGLE,
input_schema={
"type": "object",
"properties": {
"capability_id": {"type": "string"},
"rating": {"type": "integer", "minimum": 1, "maximum": 5},
"comment": {"type": "string"}
},
"required": ["capability_id"]
},
examples=[
"Rate web search performance",
"Provide feedback on capability results",
"Report issues with specific features"
],
tags=["feedback", "rating", "improvement", "quality"],
performance_tier="fast",
complexity="low",
heuristic_tags=None
))
self._register_sync(CapabilityMetadata(
name="acp_feedback_summary",
description="Analyze feedback trends and generate insights for improvement",
capability_type=CapabilityType.ACP,
scope=CapabilityScope.ANALYSIS,
input_schema={
"type": "object",
"properties": {
"capability_id": {"type": "string"},
"days_back": {"type": "integer", "default": 30}
},
"required": []
},
examples=[
"Get feedback summary for web search",
"Analyze user satisfaction trends",
"Review capability performance feedback"
],
tags=["analysis", "feedback", "trends", "insights"],
performance_tier="standard",
complexity="medium",
heuristic_tags=None
))
self._register_sync(CapabilityMetadata(
name="acp_performance_metrics",
description="Get detailed performance metrics and analytics for capabilities",
capability_type=CapabilityType.ACP,
scope=CapabilityScope.ANALYSIS,
input_schema={
"type": "object",
"properties": {
"capability_id": {"type": "string"},
"days_back": {"type": "integer", "default": 7}
},
"required": []
},
examples=[
"Get performance metrics for all capabilities",
"Analyze web search performance trends",
"Monitor system resource usage"
],
tags=["performance", "metrics", "monitoring", "analytics"],
performance_tier="standard",
complexity="medium",
heuristic_tags=None
))
self._register_sync(CapabilityMetadata(
name="acp_learning_summary",
description="Get comprehensive summary of adaptive learning system status and progress",
capability_type=CapabilityType.ACP,
scope=CapabilityScope.ANALYSIS,
input_schema={"type": "object", "properties": {}, "required": []},
examples=[
"Review system learning progress",
"Get adaptive learning insights",
"Check heuristic improvement status"
],
tags=["learning", "adaptation", "intelligence", "progress"],
performance_tier="standard",
complexity="medium",
heuristic_tags=None
))
def _register_sync(self, capability: CapabilityMetadata):
"""Register a new capability synchronously (for initialization)."""
self.capabilities[capability.name] = capability
async def register(self, capability: CapabilityMetadata):
"""Register a new capability with heuristic evaluation."""
# Auto-evaluate heuristics if not provided
if capability.heuristic_tags is None:
capability.heuristic_tags = self._evaluate_capability_heuristics(capability)
# Update derived fields
self._update_heuristic_derived_fields(capability)
self.capabilities[capability.name] = capability
# Save to persistence if available
if self.persistence:
await self.save_capabilities()
def _evaluate_capability_heuristics(self, capability: CapabilityMetadata) -> HeuristicTags:
"""Evaluate heuristics for a capability."""
from ..acp.heuristics import HeuristicEngine
engine = HeuristicEngine()
# Create operation representation for heuristic evaluation
operation = {
"type": "create" if capability.name.startswith("web_") else "analyze",
"target": capability.name,
"dependencies": capability.dependencies or [],
"complexity": capability.complexity,
"capability_type": capability.capability_type.value
}
return engine.evaluate_operation(operation)
def _update_heuristic_derived_fields(self, capability: CapabilityMetadata):
"""Update capability fields based on heuristic evaluation."""
if capability.heuristic_tags:
# Update risk level
capability.risk_level = capability.heuristic_tags.risk.value
# Update approval requirement
from ..acp.heuristics import HeuristicEngine
engine = HeuristicEngine()
capability.approval_required = not engine.can_auto_approve(capability.heuristic_tags)
# Update testing level
capability.testing_level = capability.heuristic_tags.testing.value
# Update persistence level
capability.persistence_level = capability.heuristic_tags.persistence.value
def get_capability(self, name: str) -> Optional[CapabilityMetadata]:
"""Get capability metadata by name."""
return self.capabilities.get(name)
def get_all_capabilities(self) -> List[CapabilityMetadata]:
"""Get all registered capabilities."""
return list(self.capabilities.values())
def get_capabilities_by_type(self, capability_type: CapabilityType) -> List[CapabilityMetadata]:
"""Get capabilities filtered by type."""
return [cap for cap in self.capabilities.values()
if cap.capability_type == capability_type]
def get_capabilities_by_tag(self, tag: str) -> List[CapabilityMetadata]:
"""Get capabilities filtered by tag."""
return [cap for cap in self.capabilities.values()
if tag in cap.tags]
def search_capabilities(self, query: str) -> List[CapabilityMetadata]:
"""Search capabilities by name, description, or tags."""
query = query.lower()
results = []
for cap in self.capabilities.values():
# Search in name
if query in cap.name.lower():
results.append(cap)
continue
# Search in description
if query in cap.description.lower():
results.append(cap)
continue
# Search in tags
if any(query in tag.lower() for tag in cap.tags):
results.append(cap)
return results
def get_capabilities_by_risk(self, risk_level: str) -> List[CapabilityMetadata]:
"""Get capabilities filtered by risk level."""
return [cap for cap in self.capabilities.values()
if cap.risk_level == risk_level]
def get_capabilities_requiring_approval(self) -> List[CapabilityMetadata]:
"""Get capabilities that require approval."""
return [cap for cap in self.capabilities.values()
if cap.approval_required]
def get_capabilities_by_testing_level(self, testing_level: str) -> List[CapabilityMetadata]:
"""Get capabilities filtered by testing requirements."""
return [cap for cap in self.capabilities.values()
if cap.testing_level == testing_level]
def get_safe_capabilities(self) -> List[CapabilityMetadata]:
"""Get capabilities that are safe for auto-execution."""
from ..acp.heuristics import HeuristicEngine
engine = HeuristicEngine()
safe_caps = []
for cap in self.capabilities.values():
if cap.heuristic_tags and engine.can_auto_approve(cap.heuristic_tags):
safe_caps.append(cap)
return safe_caps
def evaluate_operation_safety(self, operation: Dict[str, Any]) -> Dict[str, Any]:
"""Evaluate operation safety using heuristics."""
from ..acp.heuristics import HeuristicEngine
engine = HeuristicEngine()
tags = engine.evaluate_operation(operation)
return {
"heuristic_tags": tags.to_dict(),
"can_auto_approve": engine.can_auto_approve(tags),
"needs_parallel_testing": engine.needs_parallel_testing(tags),
"needs_git_commit": engine.needs_git_commit(tags),
"needs_manual_approval": engine.needs_manual_approval(tags),
"safety_score": self._calculate_safety_score(tags)
}
def _calculate_safety_score(self, tags: HeuristicTags) -> float:
"""Calculate safety score from heuristic tags (0.0 to 1.0)."""
# Risk scoring (lower is better)
risk_scores = {"low": 1.0, "medium": 0.7, "high": 0.4, "critical": 0.1}
risk_score = risk_scores.get(tags.risk.value, 0.5)
# Complexity scoring (lower is better)
complexity_scores = {"simple": 1.0, "moderate": 0.8, "complex": 0.5, "architectural": 0.2}
complexity_score = complexity_scores.get(tags.complexity.value, 0.7)
# Scope scoring (capability is safest)
scope_scores = {"capability": 1.0, "workflow": 0.8, "core": 0.5, "security": 0.2}
scope_score = scope_scores.get(tags.scope.value, 0.8)
# Weighted average
return (risk_score * 0.4 + complexity_score * 0.3 + scope_score * 0.3)
def get_llm_context(self, max_capabilities: int = 15) -> str:
"""Get formatted context for LLM with most relevant capabilities."""
capabilities = list(self.capabilities.values())[:max_capabilities]
context = "## Available Capabilities:\n\n"
for cap in capabilities:
context += f"### {cap.name} ({cap.capability_type.value})\n"
context += f"**Description**: {cap.description}\n"
context += f"**Scope**: {cap.scope.value}, **Complexity**: {cap.complexity}\n"
context += f"**Tags**: {', '.join(cap.tags[:3])}\n"
context += f"**Examples**: {', '.join(cap.examples[:2])}\n\n"
return context