execution_patterns.py•8.3 kB
"""
Dynamic execution patterns for intelligent ACP routing and execution.
"""
import logging
from dataclasses import dataclass
from typing import List, Dict, Any, Optional
from enum import Enum
logger = logging.getLogger(__name__)
class ExecutionStrategy(Enum):
"""Execution strategy for patterns."""
DIRECT = "direct"
SEQUENTIAL = "sequential"
PARALLEL = "parallel"
ADAPTIVE = "adaptive"
@dataclass
class ExecutionPattern:
"""Dynamic execution pattern for capability orchestration."""
name: str
capabilities: List[str]
strategy: ExecutionStrategy
description: str
examples: List[str]
prerequisites: Optional[List[str]] = None
expected_duration: str = "medium" # fast/medium/slow
complexity: str = "medium" # low/medium/high
def __post_init__(self):
if self.prerequisites is None:
self.prerequisites = []
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for LLM context."""
return {
"name": self.name,
"capabilities": self.capabilities,
"strategy": self.strategy.value,
"description": self.description,
"examples": self.examples,
"prerequisites": self.prerequisites,
"expected_duration": self.expected_duration,
"complexity": self.complexity
}
class ExecutionPatternRegistry:
"""Registry for dynamic execution patterns with persistence."""
def __init__(self, persistence=None):
self.patterns: Dict[str, ExecutionPattern] = {}
self.persistence = persistence
self._patterns_loaded = False
self._initialize_default_patterns()
async def load_patterns(self):
"""Load patterns from persistence."""
if self.persistence and not self._patterns_loaded:
try:
loaded = await self.persistence.load_patterns()
if loaded:
# Convert dict back to ExecutionPattern objects
for name, pattern_dict in loaded.items():
self.patterns[name] = ExecutionPattern(**pattern_dict)
self._patterns_loaded = True
except Exception as e:
logger.warning(f"Failed to load patterns from persistence: {e}")
async def save_patterns(self):
"""Save patterns to persistence."""
if self.persistence:
try:
# Convert to dict for serialization
pattern_dict = {name: pattern.to_dict() for name, pattern in self.patterns.items()}
await self.persistence.save_patterns(pattern_dict)
except Exception as e:
logger.warning(f"Failed to save patterns to persistence: {e}")
def _register_sync(self, pattern: ExecutionPattern):
"""Register a new pattern synchronously (for initialization)."""
self.patterns[pattern.name] = pattern
def _initialize_default_patterns(self):
"""Initialize default execution patterns based on current capabilities."""
# Web Research Pattern
self._register_sync(ExecutionPattern(
name="web_research",
capabilities=["web_search", "web_scrape"],
strategy=ExecutionStrategy.SEQUENTIAL,
description="Search web for information then extract detailed content from relevant sources",
examples=[
"Research machine learning tutorials and extract key points",
"Find information about Python frameworks and get detailed content",
"Search for news articles and extract full text"
],
expected_duration="medium",
complexity="medium"
))
# System Analysis Pattern
self._register_sync(ExecutionPattern(
name="system_analysis",
capabilities=["acp_inspect", "acp_performance_metrics", "acp_learning_summary"],
strategy=ExecutionStrategy.PARALLEL,
description="Comprehensive system health and performance analysis",
examples=[
"Check system health and performance",
"Get complete system overview",
"Analyze system capabilities and learning status"
],
expected_duration="fast",
complexity="low"
))
# Feedback Loop Pattern
self._register_sync(ExecutionPattern(
name="feedback_optimization",
capabilities=["acp_feedback_submit", "acp_feedback_summary", "acp_performance_metrics"],
strategy=ExecutionStrategy.SEQUENTIAL,
description="Submit feedback and analyze performance trends for optimization",
examples=[
"Report capability performance issues and get analysis",
"Submit user feedback and review system response",
"Analyze capability performance over time"
],
expected_duration="medium",
complexity="medium"
))
# Capability Development Pattern
self._register_sync(ExecutionPattern(
name="capability_development",
capabilities=["acp_propose", "acp_compose", "acp_heal"],
strategy=ExecutionStrategy.ADAPTIVE,
description="Propose, compose, and validate new capabilities or workflows",
examples=[
"Create a new capability for data analysis",
"Design a workflow for automated testing",
"Develop solution for system issues"
],
expected_duration="slow",
complexity="high"
))
# Learning & Adaptation Pattern
self._register_sync(ExecutionPattern(
name="learning_adaptation",
capabilities=["acp_learning_summary", "acp_performance_metrics", "acp_feedback_summary"],
strategy=ExecutionStrategy.ADAPTIVE,
description="Analyze learning progress and adapt system behavior",
examples=[
"Review system learning and suggest improvements",
"Analyze performance trends and adapt strategies",
"Evaluate feedback patterns and optimize capabilities"
],
expected_duration="medium",
complexity="high"
))
async def register(self, pattern: ExecutionPattern):
"""Register a new execution pattern."""
self.patterns[pattern.name] = pattern
# Save to persistence if available
if self.persistence:
await self.save_patterns()
def get_pattern(self, name: str) -> Optional[ExecutionPattern]:
"""Get execution pattern by name."""
return self.patterns.get(name)
def get_patterns_for_capability(self, capability: str) -> List[ExecutionPattern]:
"""Get all patterns that include a specific capability."""
return [pattern for pattern in self.patterns.values()
if capability in pattern.capabilities]
def get_all_patterns(self) -> List[ExecutionPattern]:
"""Get all registered patterns."""
return list(self.patterns.values())
def get_patterns_by_strategy(self, strategy: ExecutionStrategy) -> List[ExecutionPattern]:
"""Get patterns filtered by execution strategy."""
return [pattern for pattern in self.patterns.values()
if pattern.strategy == strategy]
def get_llm_context(self, max_patterns: int = 10) -> str:
"""Get formatted context for LLM with most relevant patterns."""
patterns = list(self.patterns.values())[:max_patterns]
context = "## Available Execution Patterns:\n\n"
for pattern in patterns:
context += f"### {pattern.name} ({pattern.strategy.value})\n"
context += f"**Description**: {pattern.description}\n"
context += f"**Capabilities**: {', '.join(pattern.capabilities)}\n"
context += f"**Examples**: {', '.join(pattern.examples[:2])}\n"
context += f"**Complexity**: {pattern.complexity}, **Duration**: {pattern.expected_duration}\n\n"
return context