system_context.py•12.5 kB
"""
System context integration for intelligent ACP routing and execution.
"""
import logging
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
from datetime import datetime, timedelta
import asyncio
logger = logging.getLogger(__name__)
@dataclass
class SystemStatus:
"""Current system status information."""
acp_status: str # active, learning, healing, maintenance
active_workflows: int
recent_errors: int
performance_score: float # 0.0 to 1.0
learning_rate: float
last_update: datetime
@dataclass
class PerformanceMetrics:
"""Recent performance metrics."""
avg_response_time: float # milliseconds
success_rate: float # 0.0 to 1.0
error_rate: float # 0.0 to 1.0
throughput: int # requests per minute
resource_usage: Dict[str, float] # cpu, memory, etc.
@dataclass
class WorkflowContext:
"""Active workflow context."""
workflow_id: str
name: str
status: str # running, completed, failed, paused
progress: float # 0.0 to 1.0
capabilities_used: List[str]
start_time: datetime
estimated_completion: Optional[datetime]
class SystemContextManager:
"""Manages system context for intelligent routing with persistence."""
def __init__(self, persistence=None):
self.persistence = persistence
self._context_loaded = False
self.status = SystemStatus(
acp_status="active",
active_workflows=0,
recent_errors=0,
performance_score=0.8,
learning_rate=0.1,
last_update=datetime.now()
)
self.performance = PerformanceMetrics(
avg_response_time=150.0,
success_rate=0.95,
error_rate=0.05,
throughput=10,
resource_usage={"cpu": 0.3, "memory": 0.4}
)
self.active_workflows: List[WorkflowContext] = []
self._context_cache: Dict[str, Any] = {}
self._cache_ttl = timedelta(minutes=5)
async def update_system_status(self):
"""Update system status from actual system components."""
try:
# Get real-time status from various components
self.status.last_update = datetime.now()
# Update performance metrics (would integrate with actual monitoring)
self.performance.success_rate = min(0.99, self.performance.success_rate + 0.01)
self.performance.avg_response_time = max(50.0, self.performance.avg_response_time - 5.0)
# Update ACP status based on learning activity
if self.status.learning_rate > 0.2:
self.status.acp_status = "learning"
elif self.status.recent_errors > 5:
self.status.acp_status = "healing"
else:
self.status.acp_status = "active"
except Exception as e:
# Fallback to safe defaults
self.status.acp_status = "active"
self.status.performance_score = 0.7
def add_workflow(self, workflow: WorkflowContext):
"""Add an active workflow to context."""
self.active_workflows.append(workflow)
self.status.active_workflows = len(self.active_workflows)
def remove_workflow(self, workflow_id: str):
"""Remove a workflow from active context."""
self.active_workflows = [
wf for wf in self.active_workflows
if wf.workflow_id != workflow_id
]
self.status.active_workflows = len(self.active_workflows)
def update_workflow_progress(self, workflow_id: str, progress: float):
"""Update workflow progress."""
for workflow in self.active_workflows:
if workflow.workflow_id == workflow_id:
workflow.progress = min(1.0, max(0.0, progress))
break
def record_error(self, error_type: str, severity: str = "medium"):
"""Record an error in system context."""
self.status.recent_errors += 1
# Adjust performance score based on errors
if severity == "high":
self.status.performance_score *= 0.9
elif severity == "medium":
self.status.performance_score *= 0.95
else:
self.status.performance_score *= 0.98
def get_context_summary(self) -> Dict[str, Any]:
"""Get comprehensive system context for LLM."""
return {
"system_status": {
"acp_status": self.status.acp_status,
"active_workflows": self.status.active_workflows,
"recent_errors": self.status.recent_errors,
"performance_score": self.status.performance_score,
"learning_rate": self.status.learning_rate,
"last_update": self.status.last_update.isoformat()
},
"performance_metrics": {
"avg_response_time": self.performance.avg_response_time,
"success_rate": self.performance.success_rate,
"error_rate": self.performance.error_rate,
"throughput": self.performance.throughput,
"resource_usage": self.performance.resource_usage
},
"active_workflows": [
{
"workflow_id": wf.workflow_id,
"name": wf.name,
"status": wf.status,
"progress": wf.progress,
"capabilities_used": wf.capabilities_used,
"start_time": wf.start_time.isoformat(),
"estimated_completion": wf.estimated_completion.isoformat() if wf.estimated_completion else None
}
for wf in self.active_workflows[-5:] # Last 5 workflows
],
"system_health": self._calculate_health_score(),
"recommendations": self._generate_recommendations()
}
def _calculate_health_score(self) -> Dict[str, Any]:
"""Calculate overall system health score."""
health_score = (
self.status.performance_score * 0.4 +
self.performance.success_rate * 0.3 +
(1.0 - self.performance.error_rate) * 0.2 +
(1.0 - min(1.0, self.status.recent_errors / 10.0)) * 0.1
)
if health_score >= 0.9:
status = "excellent"
elif health_score >= 0.8:
status = "good"
elif health_score >= 0.7:
status = "fair"
else:
status = "poor"
return {
"overall_score": round(health_score, 3),
"status": status,
"factors": {
"performance": round(self.status.performance_score, 3),
"success_rate": round(self.performance.success_rate, 3),
"error_rate": round(self.performance.error_rate, 3),
"error_impact": round(min(1.0, self.status.recent_errors / 10.0), 3)
}
}
def _generate_recommendations(self) -> List[str]:
"""Generate system recommendations based on context."""
recommendations = []
if self.performance.success_rate < 0.9:
recommendations.append("Consider reviewing error logs and optimizing frequently used capabilities")
if self.performance.avg_response_time > 200:
recommendations.append("System response times are elevated - consider resource optimization")
if self.status.recent_errors > 5:
recommendations.append("High error rate detected - system healing recommended")
if self.status.active_workflows > 3:
recommendations.append("Multiple concurrent workflows - consider load balancing")
if self.status.learning_rate < 0.1:
recommendations.append("Learning rate is low - consider increasing feedback collection")
if not recommendations:
recommendations.append("System operating normally - no immediate actions required")
return recommendations
def get_llm_context(self) -> str:
"""Get formatted context for LLM consumption."""
context = self.get_context_summary()
llm_context = "## Current System Context:\n\n"
# System Status
status = context["system_status"]
llm_context += f"### System Status\n"
llm_context += f"- **ACP Status**: {status['acp_status']}\n"
llm_context += f"- **Active Workflows**: {status['active_workflows']}\n"
llm_context += f"- **Performance Score**: {status['performance_score']:.2f}\n"
llm_context += f"- **Recent Errors**: {status['recent_errors']}\n\n"
# Performance Metrics
perf = context["performance_metrics"]
llm_context += f"### Performance Metrics\n"
llm_context += f"- **Response Time**: {perf['avg_response_time']:.0f}ms\n"
llm_context += f"- **Success Rate**: {perf['success_rate']:.1%}\n"
llm_context += f"- **Error Rate**: {perf['error_rate']:.1%}\n"
llm_context += f"- **Throughput**: {perf['throughput']} req/min\n\n"
# System Health
health = context["system_health"]
llm_context += f"### System Health\n"
llm_context += f"- **Overall**: {health['status']} ({health['overall_score']:.2f})\n\n"
# Recommendations
if context["recommendations"]:
llm_context += f"### Recommendations\n"
for rec in context["recommendations"]:
llm_context += f"- {rec}\n"
llm_context += "\n"
return llm_context
async def load_context(self):
"""Load system context from persistence."""
if self.persistence and not self._context_loaded:
try:
loaded = await self.persistence.load_system_context()
if loaded:
# Restore system status
if "system_status" in loaded:
status_data = loaded["system_status"]
self.status = SystemStatus(
acp_status=status_data.get("acp_status", "active"),
active_workflows=status_data.get("active_workflows", 0),
recent_errors=status_data.get("recent_errors", 0),
performance_score=status_data.get("performance_score", 0.8),
learning_rate=status_data.get("learning_rate", 0.1),
last_update=datetime.fromisoformat(status_data.get("last_update", datetime.now().isoformat()))
)
# Restore performance metrics
if "performance_metrics" in loaded:
perf_data = loaded["performance_metrics"]
self.performance = PerformanceMetrics(
avg_response_time=perf_data.get("avg_response_time", 150.0),
success_rate=perf_data.get("success_rate", 0.95),
error_rate=perf_data.get("error_rate", 0.05),
throughput=perf_data.get("throughput", 10),
resource_usage=perf_data.get("resource_usage", {"cpu": 0.5, "memory": 0.6})
)
self._context_loaded = True
except Exception as e:
logger.warning(f"Failed to load system context from persistence: {e}")
async def save_context(self):
"""Save system context to persistence."""
if self.persistence:
try:
context_data = self.get_context_summary()
await self.persistence.save_system_context(context_data)
except Exception as e:
logger.warning(f"Failed to save system context to persistence: {e}")
def get_system_overview(self) -> Dict[str, Any]:
"""Get simplified system overview for routing."""
return {
"health_score": int(self.status.performance_score * 100),
"active_workflows": self.status.active_workflows,
"acp_status": self.status.acp_status,
"recent_errors": self.status.recent_errors
}