"""
Autonomous Architect Agent - Dynamic DAG Strategy Generation
This agent transforms the orchestrator from static workflows to intelligent strategy formulation.
It analyzes user goals and generates executable DAGs for the orchestrator to follow.
"""
import asyncio
import json
import uuid
from typing import Dict, Any, List, Optional, Union
from datetime import datetime, timezone
from dataclasses import dataclass, asdict
from enum import Enum
import structlog
from src.core.llm_manager import LLMManager
logger = structlog.get_logger()
class TaskType(Enum):
"""Types of tasks in the DAG"""
ANALYSIS = "analysis"
DATABASE_DESIGN = "database_design"
API_DEVELOPMENT = "api_development"
FRONTEND_DEVELOPMENT = "frontend_development"
INTEGRATION = "integration"
TESTING = "testing"
SECURITY_AUDIT = "security_audit"
DEPLOYMENT = "deployment"
MONITORING = "monitoring"
OPTIMIZATION = "optimization"
class Priority(Enum):
"""Task priority levels"""
CRITICAL = 1
HIGH = 2
MEDIUM = 3
LOW = 4
OPTIONAL = 5
@dataclass
class TaskNode:
"""Individual task node in the DAG"""
id: str
name: str
type: TaskType
description: str
agent_type: str # frontend, backend, reviewer, devops
priority: Priority
dependencies: List[str]
parameters: Dict[str, Any]
estimated_duration_minutes: int
success_criteria: List[str]
verification_steps: List[str]
def to_dict(self) -> Dict[str, Any]:
return {
"id": self.id,
"name": self.name,
"type": self.type.value,
"description": self.description,
"agent_type": self.agent_type,
"priority": self.priority.value,
"dependencies": self.dependencies,
"parameters": self.parameters,
"estimated_duration_minutes": self.estimated_duration_minutes,
"success_criteria": self.success_criteria,
"verification_steps": self.verification_steps
}
@dataclass
class ExecutionDAG:
"""Complete execution plan as a Directed Acyclic Graph"""
dag_id: str
name: str
description: str
nodes: List[TaskNode]
execution_phases: List[List[str]] # Ordered phases of task IDs
total_estimated_duration: int
critical_path: List[str]
risk_assessment: Dict[str, Any]
success_metrics: Dict[str, Any]
def to_dict(self) -> Dict[str, Any]:
return {
"dag_id": self.dag_id,
"name": self.name,
"description": self.description,
"nodes": [node.to_dict() for node in self.nodes],
"execution_phases": self.execution_phases,
"total_estimated_duration": self.total_estimated_duration,
"critical_path": self.critical_path,
"risk_assessment": self.risk_assessment,
"success_metrics": self.success_metrics
}
class ArchitectAgent:
"""
Autonomous Architect Agent - The Strategic Brain
This agent is responsible for analyzing user goals and generating dynamic,
intelligent execution plans (DAGs) for the orchestrator to follow.
"""
def __init__(self, correlation_id: Optional[str] = None):
self.correlation_id = correlation_id or str(uuid.uuid4())
self.logger = logger.bind(correlation_id=self.correlation_id)
try:
self.llm_manager = LLMManager()
except:
self.llm_manager = None
self.logger.warning("LLM manager not available - using fallback architecture generation")
# Architecture patterns and templates
self.architecture_patterns = self._load_architecture_patterns()
self.task_templates = self._load_task_templates()
self.logger.info(
"architect_agent_initialized",
correlation_id=self.correlation_id,
patterns_loaded=len(self.architecture_patterns),
templates_loaded=len(self.task_templates)
)
async def generate_execution_strategy(
self,
user_goal: str,
constraints: Optional[Dict[str, Any]] = None,
preferences: Optional[Dict[str, Any]] = None
) -> ExecutionDAG:
"""
Generate a complete execution strategy as a DAG
Args:
user_goal: The user's high-level objective
constraints: Technical or business constraints
preferences: User preferences for technology, approach, etc.
Returns:
ExecutionDAG: Complete execution plan
"""
dag_id = str(uuid.uuid4())
try:
self.logger.info(
"strategy_generation_started",
dag_id=dag_id,
goal_length=len(user_goal),
has_constraints=constraints is not None,
has_preferences=preferences is not None
)
# Phase 1: Analyze and decompose the goal
goal_analysis = await self._analyze_user_goal(user_goal, constraints, preferences)
# Phase 2: Select optimal architecture pattern
architecture_pattern = await self._select_architecture_pattern(goal_analysis)
# Phase 3: Generate task nodes
task_nodes = await self._generate_task_nodes(goal_analysis, architecture_pattern)
# Phase 4: Build dependency graph
dependency_graph = await self._build_dependency_graph(task_nodes, goal_analysis)
# Phase 5: Optimize execution plan
optimized_plan = await self._optimize_execution_plan(dependency_graph, goal_analysis)
# Phase 6: Generate risk assessment
risk_assessment = await self._assess_execution_risks(optimized_plan, goal_analysis)
# Phase 7: Define success metrics
success_metrics = await self._define_success_metrics(goal_analysis, optimized_plan)
# Create the final DAG
execution_dag = ExecutionDAG(
dag_id=dag_id,
name=goal_analysis.get("project_name", "Generated Project"),
description=goal_analysis.get("detailed_description", user_goal),
nodes=optimized_plan["nodes"],
execution_phases=optimized_plan["execution_phases"],
total_estimated_duration=optimized_plan["total_duration"],
critical_path=optimized_plan["critical_path"],
risk_assessment=risk_assessment,
success_metrics=success_metrics
)
self.logger.info(
"strategy_generation_completed",
dag_id=dag_id,
total_tasks=len(execution_dag.nodes),
phases=len(execution_dag.execution_phases),
duration_hours=execution_dag.total_estimated_duration / 60
)
return execution_dag
except Exception as e:
self.logger.error(
"strategy_generation_failed",
dag_id=dag_id,
error=str(e)
)
raise
async def _analyze_user_goal(
self,
user_goal: str,
constraints: Optional[Dict[str, Any]],
preferences: Optional[Dict[str, Any]]
) -> Dict[str, Any]:
"""Analyze and decompose the user's goal into structured requirements"""
if not self.llm_manager:
return self._fallback_goal_analysis(user_goal, constraints, preferences)
analysis_prompt = f"""
You are a 10x Solutions Architect. Analyze this user goal and extract structured requirements.
USER GOAL: {user_goal}
CONSTRAINTS: {json.dumps(constraints or {}, indent=2)}
PREFERENCES: {json.dumps(preferences or {}, indent=2)}
Provide a comprehensive analysis in JSON format with:
1. PROJECT_METADATA:
- project_name: Clear, descriptive name
- project_type: (web_app, mobile_app, api, desktop_app, etc.)
- complexity_level: (simple, moderate, complex, enterprise)
- estimated_scope: (prototype, mvp, full_product, enterprise)
2. FUNCTIONAL_REQUIREMENTS:
- core_features: List of essential features
- user_types: Types of users/roles
- data_entities: Key data objects
- business_logic: Core business rules
3. TECHNICAL_REQUIREMENTS:
- preferred_technologies: Based on preferences or best practices
- scalability_needs: Expected load and growth
- performance_requirements: Speed, throughput expectations
- security_requirements: Authentication, authorization, data protection
- integration_needs: External services, APIs, databases
4. ARCHITECTURE_INDICATORS:
- suggested_pattern: (monolithic, microservices, serverless, etc.)
- database_type: (sql, nosql, hybrid)
- deployment_target: (cloud, on-premise, hybrid)
- monitoring_level: (basic, standard, comprehensive)
5. SUCCESS_CRITERIA:
- primary_success_metrics: How to measure success
- quality_gates: Non-negotiable quality requirements
- acceptance_criteria: Definition of "done"
Respond ONLY with valid JSON.
"""
try:
result = await self.llm_manager.generate_completion(
prompt=analysis_prompt,
model="claude-3-opus",
temperature=0.2, # Low temperature for structured analysis
max_tokens=4000,
metadata={
"operation": "goal_analysis",
"correlation_id": self.correlation_id
}
)
return json.loads(result.content)
except Exception as e:
self.logger.warning(
"llm_goal_analysis_failed",
error=str(e),
fallback="using_fallback_analysis"
)
return self._fallback_goal_analysis(user_goal, constraints, preferences)
async def _select_architecture_pattern(self, goal_analysis: Dict[str, Any]) -> Dict[str, Any]:
"""Select the optimal architecture pattern based on analysis"""
project_type = goal_analysis.get("PROJECT_METADATA", {}).get("project_type", "web_app")
complexity = goal_analysis.get("PROJECT_METADATA", {}).get("complexity_level", "moderate")
scalability = goal_analysis.get("TECHNICAL_REQUIREMENTS", {}).get("scalability_needs", "medium")
# Pattern selection logic
if complexity == "enterprise" or scalability == "high":
pattern_key = "microservices"
elif project_type == "api":
pattern_key = "api_first"
elif complexity == "simple":
pattern_key = "monolithic"
else:
pattern_key = "layered_monolith"
selected_pattern = self.architecture_patterns.get(pattern_key, self.architecture_patterns["layered_monolith"])
self.logger.info(
"architecture_pattern_selected",
pattern=pattern_key,
project_type=project_type,
complexity=complexity
)
return selected_pattern
async def _generate_task_nodes(
self,
goal_analysis: Dict[str, Any],
architecture_pattern: Dict[str, Any]
) -> List[TaskNode]:
"""Generate specific task nodes based on analysis and pattern"""
if not self.llm_manager:
return self._fallback_task_generation(goal_analysis, architecture_pattern)
task_generation_prompt = f"""
You are a 10x Project Manager specializing in software development workflows.
Generate a comprehensive list of tasks for this project.
GOAL ANALYSIS:
{json.dumps(goal_analysis, indent=2)}
ARCHITECTURE PATTERN:
{json.dumps(architecture_pattern, indent=2)}
Generate tasks in JSON format. Each task must have:
{{
"name": "Clear, actionable task name",
"type": "analysis|database_design|api_development|frontend_development|integration|testing|security_audit|deployment|monitoring|optimization",
"description": "Detailed description of what needs to be done",
"agent_type": "frontend|backend|reviewer|devops",
"priority": 1-5 (1=critical, 5=optional),
"dependencies": ["list", "of", "task", "names", "this", "depends", "on"],
"estimated_duration_minutes": integer,
"success_criteria": ["specific", "measurable", "criteria"],
"verification_steps": ["how", "to", "verify", "completion"],
"parameters": {{
"any": "specific parameters needed for this task"
}}
}}
Focus on:
1. Logical task breakdown following the architecture pattern
2. Clear dependencies between tasks
3. Realistic time estimates
4. Specific, measurable success criteria
5. Proper agent assignment based on expertise
Return as JSON array of tasks.
"""
try:
result = await self.llm_manager.generate_completion(
prompt=task_generation_prompt,
model="claude-3-sonnet",
temperature=0.3,
max_tokens=6000,
metadata={
"operation": "task_generation",
"correlation_id": self.correlation_id
}
)
tasks_data = json.loads(result.content)
# Convert to TaskNode objects
task_nodes = []
for task_data in tasks_data:
task_node = TaskNode(
id=str(uuid.uuid4()),
name=task_data["name"],
type=TaskType(task_data["type"]),
description=task_data["description"],
agent_type=task_data["agent_type"],
priority=Priority(task_data["priority"]),
dependencies=task_data["dependencies"],
parameters=task_data.get("parameters", {}),
estimated_duration_minutes=task_data["estimated_duration_minutes"],
success_criteria=task_data["success_criteria"],
verification_steps=task_data["verification_steps"]
)
task_nodes.append(task_node)
return task_nodes
except Exception as e:
self.logger.warning(
"llm_task_generation_failed",
error=str(e),
fallback="using_fallback_tasks"
)
return self._fallback_task_generation(goal_analysis, architecture_pattern)
async def _build_dependency_graph(
self,
task_nodes: List[TaskNode],
goal_analysis: Dict[str, Any]
) -> Dict[str, Any]:
"""Build and validate the dependency graph"""
# Create name-to-id mapping
name_to_id = {node.name: node.id for node in task_nodes}
# Convert dependency names to IDs and validate
for node in task_nodes:
node.dependencies = [
name_to_id.get(dep_name, dep_name)
for dep_name in node.dependencies
if dep_name in name_to_id
]
# Calculate execution phases based on dependencies
execution_phases = self._calculate_execution_phases(task_nodes)
# Find critical path
critical_path = self._find_critical_path(task_nodes, execution_phases)
return {
"nodes": task_nodes,
"execution_phases": execution_phases,
"critical_path": critical_path
}
async def _optimize_execution_plan(
self,
dependency_graph: Dict[str, Any],
goal_analysis: Dict[str, Any]
) -> Dict[str, Any]:
"""Optimize the execution plan for efficiency"""
nodes = dependency_graph["nodes"]
execution_phases = dependency_graph["execution_phases"]
critical_path = dependency_graph["critical_path"]
# Calculate total duration considering parallel execution
total_duration = 0
for phase in execution_phases:
phase_duration = max(
node.estimated_duration_minutes
for node in nodes
if node.id in phase
)
total_duration += phase_duration
# Optimize phase ordering for resource utilization
optimized_phases = self._optimize_phase_ordering(execution_phases, nodes)
return {
"nodes": nodes,
"execution_phases": optimized_phases,
"critical_path": critical_path,
"total_duration": total_duration
}
async def _assess_execution_risks(
self,
execution_plan: Dict[str, Any],
goal_analysis: Dict[str, Any]
) -> Dict[str, Any]:
"""Assess risks and generate mitigation strategies"""
risks = []
# Complexity risk assessment
complexity = goal_analysis.get("PROJECT_METADATA", {}).get("complexity_level", "moderate")
if complexity == "enterprise":
risks.append({
"risk": "High complexity may lead to scope creep",
"probability": "medium",
"impact": "high",
"mitigation": "Regular checkpoint reviews and scope validation"
})
# Timeline risk assessment
total_duration = execution_plan["total_duration"]
if total_duration > 480: # More than 8 hours
risks.append({
"risk": "Extended timeline may impact delivery",
"probability": "medium",
"impact": "medium",
"mitigation": "Break into smaller deliverable phases"
})
# Dependency risk assessment
critical_path_length = len(execution_plan["critical_path"])
if critical_path_length > 5:
risks.append({
"risk": "Long critical path may cause delays",
"probability": "medium",
"impact": "high",
"mitigation": "Identify parallel execution opportunities"
})
return {
"risks": risks,
"overall_risk_level": "medium" if len(risks) > 2 else "low",
"mitigation_strategies": [risk["mitigation"] for risk in risks]
}
async def _define_success_metrics(
self,
goal_analysis: Dict[str, Any],
execution_plan: Dict[str, Any]
) -> Dict[str, Any]:
"""Define comprehensive success metrics"""
return {
"completion_criteria": {
"all_tasks_completed": True,
"success_criteria_met": "100%",
"verification_steps_passed": "100%"
},
"quality_metrics": {
"code_coverage": ">= 80%",
"security_scan_passed": True,
"performance_requirements_met": True
},
"business_metrics": goal_analysis.get("SUCCESS_CRITERIA", {}).get("primary_success_metrics", []),
"timeline_metrics": {
"delivery_on_time": True,
"estimated_vs_actual_duration": "<= 120%"
}
}
def _calculate_execution_phases(self, task_nodes: List[TaskNode]) -> List[List[str]]:
"""Calculate execution phases based on dependencies"""
# Create a mapping of task IDs to nodes
id_to_node = {node.id: node for node in task_nodes}
# Track which tasks have been assigned to phases
assigned = set()
phases = []
while len(assigned) < len(task_nodes):
current_phase = []
for node in task_nodes:
if node.id in assigned:
continue
# Check if all dependencies are satisfied
deps_satisfied = all(dep_id in assigned for dep_id in node.dependencies)
if deps_satisfied:
current_phase.append(node.id)
if not current_phase:
# Handle circular dependencies by adding remaining tasks
remaining = [node.id for node in task_nodes if node.id not in assigned]
current_phase = remaining
phases.append(current_phase)
assigned.update(current_phase)
return phases
def _find_critical_path(self, task_nodes: List[TaskNode], execution_phases: List[List[str]]) -> List[str]:
"""Find the critical path through the task graph"""
# For now, return the longest sequence through phases
# This is a simplified critical path calculation
id_to_node = {node.id: node for node in task_nodes}
critical_path = []
for phase in execution_phases:
# Find the longest task in this phase
longest_task = max(
phase,
key=lambda task_id: id_to_node[task_id].estimated_duration_minutes
)
critical_path.append(longest_task)
return critical_path
def _optimize_phase_ordering(self, execution_phases: List[List[str]], nodes: List[TaskNode]) -> List[List[str]]:
"""Optimize phase ordering for better resource utilization"""
# For now, return phases as-is
# Future optimization could consider agent load balancing
return execution_phases
def _fallback_goal_analysis(
self,
user_goal: str,
constraints: Optional[Dict[str, Any]],
preferences: Optional[Dict[str, Any]]
) -> Dict[str, Any]:
"""Fallback goal analysis when LLM is not available"""
return {
"PROJECT_METADATA": {
"project_name": "Generated Project",
"project_type": "web_app",
"complexity_level": "moderate",
"estimated_scope": "mvp"
},
"FUNCTIONAL_REQUIREMENTS": {
"core_features": ["user interface", "data management", "basic operations"],
"user_types": ["end_user"],
"data_entities": ["user", "data"],
"business_logic": ["crud_operations"]
},
"TECHNICAL_REQUIREMENTS": {
"preferred_technologies": preferences.get("tech_stack", ["React", "FastAPI", "PostgreSQL"]) if preferences else ["React", "FastAPI", "PostgreSQL"],
"scalability_needs": "medium",
"performance_requirements": "standard",
"security_requirements": "basic_auth",
"integration_needs": []
},
"ARCHITECTURE_INDICATORS": {
"suggested_pattern": "layered_monolith",
"database_type": "sql",
"deployment_target": "cloud",
"monitoring_level": "basic"
},
"SUCCESS_CRITERIA": {
"primary_success_metrics": ["functional_completeness", "user_satisfaction"],
"quality_gates": ["testing_passed", "security_basic"],
"acceptance_criteria": ["meets_requirements"]
}
}
def _fallback_task_generation(
self,
goal_analysis: Dict[str, Any],
architecture_pattern: Dict[str, Any]
) -> List[TaskNode]:
"""Fallback task generation when LLM is not available"""
fallback_tasks = [
TaskNode(
id=str(uuid.uuid4()),
name="Project Analysis",
type=TaskType.ANALYSIS,
description="Analyze project requirements and create technical specifications",
agent_type="reviewer",
priority=Priority.CRITICAL,
dependencies=[],
parameters={"analysis_type": "requirements"},
estimated_duration_minutes=30,
success_criteria=["Requirements documented", "Technical specifications created"],
verification_steps=["Review documentation", "Validate requirements"]
),
TaskNode(
id=str(uuid.uuid4()),
name="Database Design",
type=TaskType.DATABASE_DESIGN,
description="Design database schema and data models",
agent_type="backend",
priority=Priority.CRITICAL,
dependencies=["Project Analysis"],
parameters={"database_type": "postgresql"},
estimated_duration_minutes=45,
success_criteria=["Schema designed", "Relationships defined"],
verification_steps=["Schema validation", "Performance review"]
),
TaskNode(
id=str(uuid.uuid4()),
name="API Development",
type=TaskType.API_DEVELOPMENT,
description="Develop REST API endpoints and business logic",
agent_type="backend",
priority=Priority.HIGH,
dependencies=["Database Design"],
parameters={"framework": "fastapi"},
estimated_duration_minutes=90,
success_criteria=["All endpoints implemented", "API documentation created"],
verification_steps=["API testing", "Documentation review"]
),
TaskNode(
id=str(uuid.uuid4()),
name="Frontend Development",
type=TaskType.FRONTEND_DEVELOPMENT,
description="Build user interface and user experience",
agent_type="frontend",
priority=Priority.HIGH,
dependencies=["Project Analysis"],
parameters={"framework": "react"},
estimated_duration_minutes=120,
success_criteria=["UI components created", "User flows implemented"],
verification_steps=["UI testing", "Accessibility check"]
),
TaskNode(
id=str(uuid.uuid4()),
name="Integration Testing",
type=TaskType.TESTING,
description="Integrate frontend and backend, test end-to-end functionality",
agent_type="reviewer",
priority=Priority.HIGH,
dependencies=["API Development", "Frontend Development"],
parameters={"test_type": "integration"},
estimated_duration_minutes=60,
success_criteria=["Integration complete", "All tests passing"],
verification_steps=["E2E test execution", "Performance validation"]
)
]
return fallback_tasks
def _load_architecture_patterns(self) -> Dict[str, Any]:
"""Load predefined architecture patterns"""
return {
"monolithic": {
"name": "Monolithic Architecture",
"description": "Single deployable unit with all components",
"components": ["web_layer", "business_layer", "data_layer"],
"deployment": "single_instance"
},
"layered_monolith": {
"name": "Layered Monolithic Architecture",
"description": "Monolith with clear layer separation",
"components": ["presentation", "business", "persistence", "database"],
"deployment": "single_instance_layered"
},
"microservices": {
"name": "Microservices Architecture",
"description": "Distributed system with independent services",
"components": ["api_gateway", "user_service", "business_service", "data_service"],
"deployment": "distributed_containers"
},
"api_first": {
"name": "API-First Architecture",
"description": "API-centric design with multiple consumers",
"components": ["core_api", "documentation", "client_sdks"],
"deployment": "api_focused"
}
}
def _load_task_templates(self) -> Dict[str, Any]:
"""Load predefined task templates"""
return {
"web_development": {
"phases": ["analysis", "design", "development", "testing", "deployment"],
"required_agents": ["frontend", "backend", "reviewer", "devops"]
},
"api_development": {
"phases": ["specification", "implementation", "documentation", "testing"],
"required_agents": ["backend", "reviewer"]
},
"mobile_development": {
"phases": ["platform_setup", "development", "testing", "deployment"],
"required_agents": ["frontend", "reviewer", "devops"]
}
}
async def regenerate_strategy(
self,
original_dag: ExecutionDAG,
execution_feedback: Dict[str, Any],
user_adjustments: Optional[Dict[str, Any]] = None
) -> ExecutionDAG:
"""
Regenerate strategy based on execution feedback and user adjustments
This enables the architect to learn and adapt strategies based on real execution results.
"""
self.logger.info(
"strategy_regeneration_started",
original_dag_id=original_dag.dag_id,
feedback_keys=list(execution_feedback.keys())
)
# Analyze what went wrong/right in the original execution
execution_analysis = await self._analyze_execution_feedback(execution_feedback)
# Apply lessons learned to regenerate improved strategy
improved_dag = await self._apply_improvements(original_dag, execution_analysis, user_adjustments)
self.logger.info(
"strategy_regeneration_completed",
original_dag_id=original_dag.dag_id,
new_dag_id=improved_dag.dag_id,
improvements_applied=len(execution_analysis.get("improvements", []))
)
return improved_dag
async def _analyze_execution_feedback(self, execution_feedback: Dict[str, Any]) -> Dict[str, Any]:
"""Analyze execution feedback to identify improvement opportunities"""
# Extract key metrics from feedback
completion_rate = execution_feedback.get("completion_rate", 0.0)
failed_tasks = execution_feedback.get("failed_tasks", [])
duration_variance = execution_feedback.get("duration_variance", 0.0)
quality_issues = execution_feedback.get("quality_issues", [])
improvements = []
if completion_rate < 0.9:
improvements.append({
"area": "task_reliability",
"issue": "Low completion rate",
"recommendation": "Add more robust error handling and fallback strategies"
})
if failed_tasks:
improvements.append({
"area": "dependency_management",
"issue": f"Tasks failed: {failed_tasks}",
"recommendation": "Review task dependencies and prerequisites"
})
if duration_variance > 0.5:
improvements.append({
"area": "time_estimation",
"issue": "High duration variance",
"recommendation": "Improve time estimation accuracy"
})
if quality_issues:
improvements.append({
"area": "quality_control",
"issue": f"Quality issues: {quality_issues}",
"recommendation": "Add more comprehensive quality gates"
})
return {
"execution_summary": execution_feedback,
"improvements": improvements,
"success_patterns": execution_feedback.get("successful_patterns", []),
"failure_patterns": execution_feedback.get("failure_patterns", [])
}
async def _apply_improvements(
self,
original_dag: ExecutionDAG,
execution_analysis: Dict[str, Any],
user_adjustments: Optional[Dict[str, Any]]
) -> ExecutionDAG:
"""Apply improvements to create an enhanced DAG"""
# Create new DAG with improvements
new_dag_id = str(uuid.uuid4())
improved_nodes = []
# Apply improvements to each node
for node in original_dag.nodes:
improved_node = self._improve_task_node(node, execution_analysis)
improved_nodes.append(improved_node)
# Apply user adjustments if provided
if user_adjustments:
improved_nodes = self._apply_user_adjustments(improved_nodes, user_adjustments)
# Recalculate execution plan
execution_phases = self._calculate_execution_phases(improved_nodes)
critical_path = self._find_critical_path(improved_nodes, execution_phases)
total_duration = sum(node.estimated_duration_minutes for node in improved_nodes)
return ExecutionDAG(
dag_id=new_dag_id,
name=f"{original_dag.name} (Improved)",
description=f"Improved version based on execution feedback",
nodes=improved_nodes,
execution_phases=execution_phases,
total_estimated_duration=total_duration,
critical_path=critical_path,
risk_assessment=original_dag.risk_assessment, # Could be recalculated
success_metrics=original_dag.success_metrics
)
def _improve_task_node(self, node: TaskNode, execution_analysis: Dict[str, Any]) -> TaskNode:
"""Improve a single task node based on execution analysis"""
# Create improved copy of the node
improved_node = TaskNode(
id=str(uuid.uuid4()), # New ID for improved version
name=node.name,
type=node.type,
description=node.description,
agent_type=node.agent_type,
priority=node.priority,
dependencies=node.dependencies.copy(),
parameters=node.parameters.copy(),
estimated_duration_minutes=node.estimated_duration_minutes,
success_criteria=node.success_criteria.copy(),
verification_steps=node.verification_steps.copy()
)
# Apply improvements based on analysis
for improvement in execution_analysis.get("improvements", []):
if improvement["area"] == "task_reliability":
improved_node.verification_steps.append("Validate prerequisites before execution")
improved_node.parameters["error_handling"] = "enhanced"
elif improvement["area"] == "time_estimation":
# Adjust time estimate based on variance
improved_node.estimated_duration_minutes = int(node.estimated_duration_minutes * 1.2)
elif improvement["area"] == "quality_control":
improved_node.success_criteria.append("Pass automated quality checks")
improved_node.verification_steps.append("Run quality validation suite")
return improved_node
def _apply_user_adjustments(
self,
nodes: List[TaskNode],
user_adjustments: Dict[str, Any]
) -> List[TaskNode]:
"""Apply user-requested adjustments to the DAG"""
# This would handle user requests like:
# - "Add more testing phases"
# - "Use different technology stack"
# - "Reduce complexity"
# - "Add security review"
adjusted_nodes = nodes.copy()
if "add_security_review" in user_adjustments:
security_node = TaskNode(
id=str(uuid.uuid4()),
name="Enhanced Security Review",
type=TaskType.SECURITY_AUDIT,
description="Comprehensive security audit and penetration testing",
agent_type="reviewer",
priority=Priority.HIGH,
dependencies=[node.id for node in nodes if node.type == TaskType.API_DEVELOPMENT],
parameters={"security_level": "comprehensive"},
estimated_duration_minutes=90,
success_criteria=["Security scan passed", "Vulnerabilities addressed"],
verification_steps=["Automated security scan", "Manual security review"]
)
adjusted_nodes.append(security_node)
if "technology_change" in user_adjustments:
tech_changes = user_adjustments["technology_change"]
for node in adjusted_nodes:
if "framework" in node.parameters:
if node.parameters["framework"] in tech_changes:
node.parameters["framework"] = tech_changes[node.parameters["framework"]]
return adjusted_nodes