workflow_optimizer.pyโข30.8 kB
"""
Workflow Optimization Engine - Phase 3
Advanced workflow analysis and optimization using machine learning
and pattern recognition to improve capability orchestration.
"""
import asyncio
import json
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any, Tuple, Set
from dataclasses import dataclass, field
from enum import Enum
import uuid
import logging
from .data_models import (
FeedbackEvent, PerformanceSnapshot, CapabilityProfile,
LearningRecord, AdaptationProposal, AdaptationType
)
# from .adaptive_learning import AdaptiveLearningManager
# from .performance_tracker import PerformanceTracker
# from .feedback import FeedbackCollector
logger = logging.getLogger(__name__)
class WorkflowPattern(Enum):
"""Types of workflow patterns"""
SEQUENTIAL = "sequential"
PARALLEL = "parallel"
PIPELINE = "pipeline"
FAN_OUT = "fan_out"
REDUCE = "reduce"
CONDITIONAL = "conditional"
ITERATIVE = "iterative"
class OptimizationStrategy(Enum):
"""Workflow optimization strategies"""
REORDER = "reorder"
PARALLELIZE = "parallelize"
CACHE = "cache"
BATCH = "batch"
SKIP = "skip"
MERGE = "merge"
SPLIT = "split"
@dataclass
class WorkflowStep:
"""Single step in a workflow"""
step_id: str = field(default_factory=lambda: str(uuid.uuid4()))
capability_id: str = ""
step_name: str = ""
step_type: str = "transform" # transform, validate, aggregate, etc.
# Dependencies
depends_on: List[str] = field(default_factory=list)
outputs_to: List[str] = field(default_factory=list)
# Execution parameters
timeout: float = 30.0
retry_count: int = 3
priority: int = 5 # 1-10, higher is more important
# Performance data
avg_execution_time: float = 0.0
success_rate: float = 1.0
resource_intensity: float = 0.5 # 0-1
# Optimization flags
can_parallelize: bool = True
can_cache: bool = False
cache_ttl: int = 300 # seconds
batch_size: Optional[int] = None
# Metadata
metadata: Dict[str, Any] = field(default_factory=dict)
@dataclass
class WorkflowDefinition:
"""Definition of a complete workflow"""
workflow_id: str = field(default_factory=lambda: str(uuid.uuid4()))
name: str = ""
description: str = ""
version: str = "1.0.0"
# Workflow structure
steps: List[WorkflowStep] = field(default_factory=list)
patterns: List[WorkflowPattern] = field(default_factory=list)
# Execution constraints
max_parallel_steps: int = 5
total_timeout: float = 300.0
resource_limits: Dict[str, Any] = field(default_factory=dict)
# Optimization settings
auto_optimize: bool = True
optimization_frequency: int = 100 # executions
optimization_strategies: List[OptimizationStrategy] = field(default_factory=list)
# Performance tracking
total_executions: int = 0
avg_execution_time: float = 0.0
success_rate: float = 1.0
last_optimized: Optional[datetime] = None
# Metadata
created_at: datetime = field(default_factory=datetime.now)
updated_at: datetime = field(default_factory=datetime.now)
tags: List[str] = field(default_factory=list)
metadata: Dict[str, Any] = field(default_factory=dict)
@dataclass
class WorkflowExecution:
"""Record of a workflow execution"""
execution_id: str = field(default_factory=lambda: str(uuid.uuid4()))
workflow_id: str = ""
started_at: datetime = field(default_factory=datetime.now)
ended_at: Optional[datetime] = None
# Execution plan
execution_plan: List[str] = field(default_factory=list) # Step IDs in order
parallel_groups: List[List[str]] = field(default_factory=list)
# Results
step_results: Dict[str, Any] = field(default_factory=dict)
step_timings: Dict[str, float] = field(default_factory=dict)
step_errors: Dict[str, str] = field(default_factory=dict)
# Performance
total_time: float = 0.0
parallel_efficiency: float = 0.0 # Actual vs theoretical parallel speedup
resource_utilization: float = 0.0
# Status
status: str = "running" # running, completed, failed, cancelled
error_message: Optional[str] = None
# Optimization data
optimization_suggestions: List[str] = field(default_factory=list)
bottlenecks: List[str] = field(default_factory=list)
@dataclass
class OptimizationProposal:
"""Proposal for workflow optimization"""
proposal_id: str = field(default_factory=lambda: str(uuid.uuid4()))
timestamp: datetime = field(default_factory=datetime.now)
workflow_id: str = ""
# Proposal details
strategy: OptimizationStrategy = OptimizationStrategy.REORDER
target_steps: List[str] = field(default_factory=list)
# Current vs proposed
current_structure: Dict[str, Any] = field(default_factory=dict)
proposed_structure: Dict[str, Any] = field(default_factory=dict)
# Expected impact
expected_time_reduction: float = 0.0 # percentage
expected_resource_reduction: float = 0.0 # percentage
expected_success_improvement: float = 0.0 # percentage
# Confidence and evidence
confidence_score: float = 0.0 # 0-1
supporting_data: List[str] = field(default_factory=list)
similar_workflows: List[str] = field(default_factory=list)
# Implementation
implementation_complexity: str = "low" # low, medium, high
rollback_plan: str = ""
testing_required: bool = True
# Status
status: str = "proposed" # proposed, approved, rejected, applied, rolled_back
reviewed_by: Optional[str] = None
reviewed_at: Optional[datetime] = None
applied_at: Optional[datetime] = None
# Results
actual_impact: Optional[Dict[str, float]] = None
side_effects: List[str] = field(default_factory=list)
# Additional fields for extensibility
reasoning: str = ""
metadata: Dict[str, Any] = field(default_factory=dict)
class WorkflowOptimizer:
"""Advanced workflow optimization engine"""
def __init__(self):
# self.adaptive_learning = adaptive_learning
# self.performance_tracker = performance_tracker
# self.feedback_collector = feedback_collector
# Workflow storage
self.workflows: Dict[str, WorkflowDefinition] = {}
self.executions: Dict[str, WorkflowExecution] = {}
self.proposals: Dict[str, OptimizationProposal] = {}
# Optimization state
self.pattern_cache: Dict[str, List[WorkflowPattern]] = {}
self.performance_history: Dict[str, List[float]] = {}
self.optimization_lock = asyncio.Lock()
# ML models (simplified for Phase 3)
self.bottleneck_detector = None
self.pattern_matcher = None
self.performance_predictor = None
logger.info("WorkflowOptimizer initialized")
async def register_workflow(self, workflow: WorkflowDefinition) -> str:
"""Register a new workflow for optimization"""
workflow_id = workflow.workflow_id
# Analyze workflow patterns
workflow.patterns = await self._analyze_patterns(workflow)
# Set default optimization strategies if not provided
if not workflow.optimization_strategies:
workflow.optimization_strategies = [
OptimizationStrategy.REORDER,
OptimizationStrategy.PARALLELIZE,
OptimizationStrategy.CACHE
]
self.workflows[workflow_id] = workflow
self.performance_history[workflow_id] = []
logger.info(f"Registered workflow: {workflow.name} ({workflow_id})")
return workflow_id
async def execute_workflow(
self,
workflow_id: str,
context: Optional[Dict[str, Any]] = None
) -> WorkflowExecution:
"""Execute a workflow with optimization"""
if workflow_id not in self.workflows:
raise ValueError(f"Workflow {workflow_id} not found")
workflow = self.workflows[workflow_id]
execution = WorkflowExecution(workflow_id=workflow_id)
try:
# Generate optimized execution plan
execution.execution_plan, execution.parallel_groups = await self._generate_execution_plan(workflow)
# Execute the workflow
await self._execute_workflow_steps(execution, workflow, context or {})
# Record execution
execution.ended_at = datetime.now()
execution.total_time = (execution.ended_at - execution.started_at).total_seconds()
execution.status = "completed"
# Update workflow statistics
await self._update_workflow_stats(workflow_id, execution)
# Check if optimization is needed
if workflow.auto_optimize and workflow.total_executions % workflow.optimization_frequency == 0:
asyncio.create_task(self._optimize_workflow(workflow_id))
except Exception as e:
execution.status = "failed"
execution.error_message = str(e)
logger.error(f"Workflow execution failed: {e}")
self.executions[execution.execution_id] = execution
return execution
async def _generate_execution_plan(
self,
workflow: WorkflowDefinition
) -> Tuple[List[str], List[List[str]]]:
"""Generate optimized execution plan"""
steps = {step.step_id: step for step in workflow.steps}
# Build dependency graph
dependencies = {step_id: step.depends_on for step_id, step in steps.items()}
# Topological sort for basic ordering
plan = self._topological_sort(dependencies)
# Identify parallelizable groups
parallel_groups = self._identify_parallel_groups(plan, dependencies, steps)
# Apply optimizations
if OptimizationStrategy.REORDER in workflow.optimization_strategies:
plan = await self._optimize_step_order(plan, steps)
return plan, parallel_groups
def _topological_sort(self, dependencies: Dict[str, List[str]]) -> List[str]:
"""Topological sort of workflow steps using Kahn's algorithm
Args:
dependencies: Dict where key = step_id, value = list of step_ids it depends on
Returns:
List of step_ids in topological order (dependencies first)
"""
# Initialize in-degree for all nodes to 0
all_nodes = set(dependencies.keys())
for deps in dependencies.values():
all_nodes.update(deps)
in_degree = {node: 0 for node in all_nodes}
# Calculate in-degree: count how many dependencies each node has
for node, deps in dependencies.items():
for dep in deps:
in_degree[node] += 1 # node depends on dep
# Start with nodes that have no dependencies (in-degree = 0)
queue = [node for node in all_nodes if in_degree[node] == 0]
result = []
while queue:
# Sort queue for deterministic results
queue.sort()
node = queue.pop(0)
result.append(node)
# Find nodes that depend on this node and reduce their in-degree
for other_node, deps in dependencies.items():
if node in deps:
in_degree[other_node] -= 1
if in_degree[other_node] == 0:
queue.append(other_node)
# Check if we have a valid topological order (no cycles)
if len(result) != len(all_nodes):
raise ValueError("Cycle detected in workflow dependencies")
return result
def _identify_parallel_groups(
self,
plan: List[str],
dependencies: Dict[str, List[str]],
steps: Dict[str, WorkflowStep]
) -> List[List[str]]:
"""Identify groups of steps that can run in parallel"""
groups = []
remaining = set(plan)
while remaining:
# Find steps with no unmet dependencies in remaining set
ready = []
for step_id in remaining:
deps = set(dependencies[step_id])
if deps.isdisjoint(remaining):
ready.append(step_id)
# Filter by parallelization capability and resource limits
parallel_ready = [
step_id for step_id in ready
if steps[step_id].can_parallelize
]
if parallel_ready:
groups.append(parallel_ready)
remaining -= set(parallel_ready)
else:
# No parallelizable steps ready, execute single step
groups.append([ready[0]])
remaining.remove(ready[0])
return groups
async def _optimize_step_order(
self,
plan: List[str],
steps: Dict[str, WorkflowStep]
) -> List[str]:
"""Optimize step ordering based on performance data"""
# Simple heuristic: prioritize faster, more reliable steps
step_scores = {
step_id: (
steps[step_id].success_rate * 0.6 +
(1.0 / (1.0 + steps[step_id].avg_execution_time)) * 0.4
)
for step_id in plan
}
# Sort within dependency constraints
optimized_plan = []
remaining = set(plan)
while remaining:
# Find steps with all dependencies satisfied
ready = [
step_id for step_id in remaining
if all(dep in optimized_plan for dep in steps[step_id].depends_on)
]
if not ready:
# Should not happen with valid DAG
ready = list(remaining)
# Sort by score (highest first)
ready.sort(key=lambda x: step_scores[x], reverse=True)
# Add best step
optimized_plan.append(ready[0])
remaining.remove(ready[0])
return optimized_plan
async def _execute_workflow_steps(
self,
execution: WorkflowExecution,
workflow: WorkflowDefinition,
context: Dict[str, Any]
):
"""Execute workflow steps according to plan"""
steps = {step.step_id: step for step in workflow.steps}
step_start_times = {}
for group in execution.parallel_groups:
if len(group) == 1:
# Sequential execution
step_id = group[0]
step_start_times[step_id] = datetime.now()
await self._execute_single_step(step_id, steps[step_id], execution, context)
else:
# Parallel execution
tasks = []
for step_id in group:
step_start_times[step_id] = datetime.now()
task = asyncio.create_task(
self._execute_single_step(step_id, steps[step_id], execution, context)
)
tasks.append(task)
await asyncio.gather(*tasks, return_exceptions=True)
# Calculate step timings
for step_id in execution.step_results:
if step_id in step_start_times:
duration = (datetime.now() - step_start_times[step_id]).total_seconds()
execution.step_timings[step_id] = duration
async def _execute_single_step(
self,
step_id: str,
step: WorkflowStep,
execution: WorkflowExecution,
context: Dict[str, Any]
):
"""Execute a single workflow step"""
try:
# Simulate step execution (in real implementation, this would call the capability)
execution_time = step.avg_execution_time * (0.8 + 0.4 * hash(step_id) % 10 / 10)
await asyncio.sleep(min(execution_time, 0.1)) # Cap for demo
# Record result
execution.step_results[step_id] = {"status": "success", "data": f"Result of {step_id}"}
except Exception as e:
execution.step_errors[step_id] = str(e)
execution.step_results[step_id] = {"status": "error", "error": str(e)}
async def _update_workflow_stats(self, workflow_id: str, execution: WorkflowExecution):
"""Update workflow statistics after execution"""
if workflow_id not in self.workflows:
return
workflow = self.workflows[workflow_id]
workflow.total_executions += 1
# Update execution time
if workflow.total_executions == 1:
workflow.avg_execution_time = execution.total_time
else:
workflow.avg_execution_time = (
(workflow.avg_execution_time * (workflow.total_executions - 1) + execution.total_time) /
workflow.total_executions
)
# Update success rate
if execution.status == "completed":
workflow.success_rate = (
(workflow.success_rate * (workflow.total_executions - 1) + 1.0) /
workflow.total_executions
)
else:
workflow.success_rate = (
(workflow.success_rate * (workflow.total_executions - 1)) /
workflow.total_executions
)
workflow.updated_at = datetime.now()
# Track performance history
self.performance_history[workflow_id].append(execution.total_time)
if len(self.performance_history[workflow_id]) > 1000:
self.performance_history[workflow_id] = self.performance_history[workflow_id][-500:]
async def _analyze_patterns(self, workflow: WorkflowDefinition) -> List[WorkflowPattern]:
"""Analyze workflow patterns"""
patterns = []
steps = workflow.steps
# Check for sequential pattern
if all(len(step.depends_on) <= 1 and len(step.outputs_to) <= 1 for step in steps):
patterns.append(WorkflowPattern.SEQUENTIAL)
# Check for parallel pattern
parallel_steps = sum(1 for step in steps if step.can_parallelize)
if parallel_steps > 1:
patterns.append(WorkflowPattern.PARALLEL)
# Check for pipeline pattern
if len(steps) > 2 and all(
len(step.depends_on) == 1 and len(step.outputs_to) == 1
for step in steps[1:-1]
):
patterns.append(WorkflowPattern.PIPELINE)
# Check for fan-out pattern
fan_out_steps = sum(1 for step in steps if len(step.outputs_to) > 1)
if fan_out_steps > 0:
patterns.append(WorkflowPattern.FAN_OUT)
# Check for reduce pattern
reduce_steps = sum(1 for step in steps if len(step.depends_on) > 1)
if reduce_steps > 0:
patterns.append(WorkflowPattern.REDUCE)
return patterns
async def _optimize_workflow(self, workflow_id: str):
"""Optimize a workflow based on performance data"""
async with self.optimization_lock:
if workflow_id not in self.workflows:
return
workflow = self.workflows[workflow_id]
# Analyze performance and identify optimization opportunities
proposals = await self._generate_optimization_proposals(workflow_id)
# Apply high-confidence proposals
for proposal in proposals:
if proposal.confidence_score > 0.8 and proposal.implementation_complexity == "low":
await self._apply_optimization(proposal)
async def _generate_optimization_proposals(self, workflow_id: str) -> List[OptimizationProposal]:
"""Generate optimization proposals for a workflow"""
proposals = []
workflow = self.workflows[workflow_id]
if workflow_id not in self.performance_history or len(self.performance_history[workflow_id]) < 10:
return proposals
performance_data = self.performance_history[workflow_id]
recent_performance = performance_data[-10:]
avg_recent = sum(recent_performance) / len(recent_performance)
# Analyze bottlenecks
if avg_recent > workflow.avg_execution_time * 1.2:
# Performance degradation detected
proposal = await self._create_parallelization_proposal(workflow)
if proposal:
proposals.append(proposal)
# Analyze execution patterns
if workflow.total_executions > 50:
cache_proposal = await self._create_caching_proposal(workflow)
if cache_proposal:
proposals.append(cache_proposal)
return proposals
async def _create_parallelization_proposal(self, workflow: WorkflowDefinition) -> Optional[OptimizationProposal]:
"""Create proposal for parallelization optimization"""
# Find sequential steps that could be parallelized
sequential_steps = [
step for step in workflow.steps
if not step.can_parallelize and len(step.depends_on) <= 1
]
if len(sequential_steps) < 2:
return None
# Create proposal
proposal = OptimizationProposal(
workflow_id=workflow.workflow_id,
strategy=OptimizationStrategy.PARALLELIZE,
target_steps=[step.step_id for step in sequential_steps[:2]], # Start with 2 steps
confidence_score=0.7,
expected_time_reduction=15.0, # 15% improvement expected
implementation_complexity="medium"
)
proposal.reasoning = "Sequential steps identified that can be parallelized to improve throughput"
proposal.supporting_data = [
f"Found {len(sequential_steps)} sequential steps",
f"Current execution time: {workflow.avg_execution_time:.2f}s"
]
return proposal
async def _create_caching_proposal(self, workflow: WorkflowDefinition) -> Optional[OptimizationProposal]:
"""Create proposal for caching optimization"""
# Find steps with repeatable results
cacheable_steps = [
step for step in workflow.steps
if not step.can_cache and step.success_rate > 0.95
]
if not cacheable_steps:
return None
proposal = OptimizationProposal(
workflow_id=workflow.workflow_id,
strategy=OptimizationStrategy.CACHE,
target_steps=[step.step_id for step in cacheable_steps[:1]],
confidence_score=0.8,
expected_time_reduction=10.0,
implementation_complexity="low"
)
proposal.reasoning = "High-success steps identified as candidates for result caching"
proposal.supporting_data = [
f"Found {len(cacheable_steps)} cacheable steps",
f"Average success rate: {sum(s.success_rate for s in cacheable_steps) / len(cacheable_steps):.2%}"
]
return proposal
async def _apply_optimization(self, proposal: OptimizationProposal):
"""Apply an optimization proposal"""
if proposal.workflow_id not in self.workflows:
return
workflow = self.workflows[proposal.workflow_id]
try:
if proposal.strategy == OptimizationStrategy.PARALLELIZE:
# Enable parallelization for target steps
for step in workflow.steps:
if step.step_id in proposal.target_steps:
step.can_parallelize = True
elif proposal.strategy == OptimizationStrategy.CACHE:
# Enable caching for target steps
for step in workflow.steps:
if step.step_id in proposal.target_steps:
step.can_cache = True
step.cache_ttl = 300
# Update proposal status
proposal.status = "applied"
proposal.applied_at = datetime.now()
workflow.last_optimized = datetime.now()
# Store proposal
self.proposals[proposal.proposal_id] = proposal
logger.info(f"Applied optimization {proposal.proposal_id} to workflow {workflow.name}")
except Exception as e:
proposal.status = "failed"
logger.error(f"Failed to apply optimization {proposal.proposal_id}: {e}")
async def get_workflow_analytics(self, workflow_id: str) -> Dict[str, Any]:
"""Get comprehensive analytics for a workflow"""
if workflow_id not in self.workflows:
return {}
workflow = self.workflows[workflow_id]
executions = [
exec for exec in self.executions.values()
if exec.workflow_id == workflow_id
]
# Calculate analytics
analytics = {
"workflow": {
"id": workflow.workflow_id,
"name": workflow.name,
"version": workflow.version,
"total_executions": workflow.total_executions,
"success_rate": workflow.success_rate,
"avg_execution_time": workflow.avg_execution_time,
"patterns": [p.value for p in workflow.patterns],
"last_optimized": workflow.last_optimized.isoformat() if workflow.last_optimized else None
},
"performance": {
"recent_executions": len(executions[-10:]) if executions else 0,
"performance_trend": self._calculate_performance_trend(workflow_id),
"bottlenecks": await self._identify_bottlenecks(workflow_id),
"optimization_opportunities": await self._get_optimization_opportunities(workflow_id)
},
"optimizations": {
"proposals_generated": len(self.proposals),
"proposals_applied": len([p for p in self.proposals.values() if p.status == "applied"]),
"last_optimization": workflow.last_optimized.isoformat() if workflow.last_optimized else None
}
}
return analytics
def _calculate_performance_trend(self, workflow_id: str) -> str:
"""Calculate performance trend for a workflow"""
if workflow_id not in self.performance_history or len(self.performance_history[workflow_id]) < 10:
return "insufficient_data"
history = self.performance_history[workflow_id]
recent = history[-5:]
older = history[-10:-5]
recent_avg = sum(recent) / len(recent)
older_avg = sum(older) / len(older)
if recent_avg < older_avg * 0.9:
return "improving"
elif recent_avg > older_avg * 1.1:
return "degrading"
else:
return "stable"
async def _identify_bottlenecks(self, workflow_id: str) -> List[str]:
"""Identify bottlenecks in a workflow"""
bottlenecks = []
if workflow_id not in self.workflows:
return bottlenecks
workflow = self.workflows[workflow_id]
# Find slow steps
slow_steps = [
step.step_name for step in workflow.steps
if step.avg_execution_time > workflow.avg_execution_time / len(workflow.steps) * 1.5
]
if slow_steps:
bottlenecks.append(f"Slow steps: {', '.join(slow_steps)}")
# Find unreliable steps
unreliable_steps = [
step.step_name for step in workflow.steps
if step.success_rate < 0.9
]
if unreliable_steps:
bottlenecks.append(f"Unreliable steps: {', '.join(unreliable_steps)}")
return bottlenecks
async def _get_optimization_opportunities(self, workflow_id: str) -> List[str]:
"""Get optimization opportunities for a workflow"""
opportunities = []
if workflow_id not in self.workflows:
return opportunities
workflow = self.workflows[workflow_id]
# Check for parallelization opportunities
sequential_steps = [
step for step in workflow.steps
if not step.can_parallelize and len(step.depends_on) <= 1
]
if len(sequential_steps) > 1:
opportunities.append(f"Can parallelize {len(sequential_steps)} sequential steps")
# Check for caching opportunities
non_cacheable_steps = [
step for step in workflow.steps
if not step.can_cache and step.success_rate > 0.95
]
if non_cacheable_steps:
opportunities.append(f"Can cache {len(non_cacheable_steps)} high-success steps")
return opportunities
async def get_optimization_proposals(self, workflow_id: Optional[str] = None) -> List[OptimizationProposal]:
"""Get optimization proposals"""
if workflow_id:
return [p for p in self.proposals.values() if p.workflow_id == workflow_id]
return list(self.proposals.values())
async def approve_proposal(self, proposal_id: str, reviewer: str) -> bool:
"""Approve an optimization proposal"""
if proposal_id not in self.proposals:
return False
proposal = self.proposals[proposal_id]
proposal.status = "approved"
proposal.reviewed_by = reviewer
proposal.reviewed_at = datetime.now()
# Apply the optimization
await self._apply_optimization(proposal)
return True
async def reject_proposal(self, proposal_id: str, reviewer: str, reason: str) -> bool:
"""Reject an optimization proposal"""
if proposal_id not in self.proposals:
return False
proposal = self.proposals[proposal_id]
proposal.status = "rejected"
proposal.reviewed_by = reviewer
proposal.reviewed_at = datetime.now()
proposal.metadata["rejection_reason"] = reason
return True