Skip to main content
Glama

Katamari MCP Server

by ciphernaut
workflow_optimizer.pyโ€ข30.8 kB
""" Workflow Optimization Engine - Phase 3 Advanced workflow analysis and optimization using machine learning and pattern recognition to improve capability orchestration. """ import asyncio import json from datetime import datetime, timedelta from typing import Dict, List, Optional, Any, Tuple, Set from dataclasses import dataclass, field from enum import Enum import uuid import logging from .data_models import ( FeedbackEvent, PerformanceSnapshot, CapabilityProfile, LearningRecord, AdaptationProposal, AdaptationType ) # from .adaptive_learning import AdaptiveLearningManager # from .performance_tracker import PerformanceTracker # from .feedback import FeedbackCollector logger = logging.getLogger(__name__) class WorkflowPattern(Enum): """Types of workflow patterns""" SEQUENTIAL = "sequential" PARALLEL = "parallel" PIPELINE = "pipeline" FAN_OUT = "fan_out" REDUCE = "reduce" CONDITIONAL = "conditional" ITERATIVE = "iterative" class OptimizationStrategy(Enum): """Workflow optimization strategies""" REORDER = "reorder" PARALLELIZE = "parallelize" CACHE = "cache" BATCH = "batch" SKIP = "skip" MERGE = "merge" SPLIT = "split" @dataclass class WorkflowStep: """Single step in a workflow""" step_id: str = field(default_factory=lambda: str(uuid.uuid4())) capability_id: str = "" step_name: str = "" step_type: str = "transform" # transform, validate, aggregate, etc. # Dependencies depends_on: List[str] = field(default_factory=list) outputs_to: List[str] = field(default_factory=list) # Execution parameters timeout: float = 30.0 retry_count: int = 3 priority: int = 5 # 1-10, higher is more important # Performance data avg_execution_time: float = 0.0 success_rate: float = 1.0 resource_intensity: float = 0.5 # 0-1 # Optimization flags can_parallelize: bool = True can_cache: bool = False cache_ttl: int = 300 # seconds batch_size: Optional[int] = None # Metadata metadata: Dict[str, Any] = field(default_factory=dict) @dataclass class WorkflowDefinition: """Definition of a complete workflow""" workflow_id: str = field(default_factory=lambda: str(uuid.uuid4())) name: str = "" description: str = "" version: str = "1.0.0" # Workflow structure steps: List[WorkflowStep] = field(default_factory=list) patterns: List[WorkflowPattern] = field(default_factory=list) # Execution constraints max_parallel_steps: int = 5 total_timeout: float = 300.0 resource_limits: Dict[str, Any] = field(default_factory=dict) # Optimization settings auto_optimize: bool = True optimization_frequency: int = 100 # executions optimization_strategies: List[OptimizationStrategy] = field(default_factory=list) # Performance tracking total_executions: int = 0 avg_execution_time: float = 0.0 success_rate: float = 1.0 last_optimized: Optional[datetime] = None # Metadata created_at: datetime = field(default_factory=datetime.now) updated_at: datetime = field(default_factory=datetime.now) tags: List[str] = field(default_factory=list) metadata: Dict[str, Any] = field(default_factory=dict) @dataclass class WorkflowExecution: """Record of a workflow execution""" execution_id: str = field(default_factory=lambda: str(uuid.uuid4())) workflow_id: str = "" started_at: datetime = field(default_factory=datetime.now) ended_at: Optional[datetime] = None # Execution plan execution_plan: List[str] = field(default_factory=list) # Step IDs in order parallel_groups: List[List[str]] = field(default_factory=list) # Results step_results: Dict[str, Any] = field(default_factory=dict) step_timings: Dict[str, float] = field(default_factory=dict) step_errors: Dict[str, str] = field(default_factory=dict) # Performance total_time: float = 0.0 parallel_efficiency: float = 0.0 # Actual vs theoretical parallel speedup resource_utilization: float = 0.0 # Status status: str = "running" # running, completed, failed, cancelled error_message: Optional[str] = None # Optimization data optimization_suggestions: List[str] = field(default_factory=list) bottlenecks: List[str] = field(default_factory=list) @dataclass class OptimizationProposal: """Proposal for workflow optimization""" proposal_id: str = field(default_factory=lambda: str(uuid.uuid4())) timestamp: datetime = field(default_factory=datetime.now) workflow_id: str = "" # Proposal details strategy: OptimizationStrategy = OptimizationStrategy.REORDER target_steps: List[str] = field(default_factory=list) # Current vs proposed current_structure: Dict[str, Any] = field(default_factory=dict) proposed_structure: Dict[str, Any] = field(default_factory=dict) # Expected impact expected_time_reduction: float = 0.0 # percentage expected_resource_reduction: float = 0.0 # percentage expected_success_improvement: float = 0.0 # percentage # Confidence and evidence confidence_score: float = 0.0 # 0-1 supporting_data: List[str] = field(default_factory=list) similar_workflows: List[str] = field(default_factory=list) # Implementation implementation_complexity: str = "low" # low, medium, high rollback_plan: str = "" testing_required: bool = True # Status status: str = "proposed" # proposed, approved, rejected, applied, rolled_back reviewed_by: Optional[str] = None reviewed_at: Optional[datetime] = None applied_at: Optional[datetime] = None # Results actual_impact: Optional[Dict[str, float]] = None side_effects: List[str] = field(default_factory=list) # Additional fields for extensibility reasoning: str = "" metadata: Dict[str, Any] = field(default_factory=dict) class WorkflowOptimizer: """Advanced workflow optimization engine""" def __init__(self): # self.adaptive_learning = adaptive_learning # self.performance_tracker = performance_tracker # self.feedback_collector = feedback_collector # Workflow storage self.workflows: Dict[str, WorkflowDefinition] = {} self.executions: Dict[str, WorkflowExecution] = {} self.proposals: Dict[str, OptimizationProposal] = {} # Optimization state self.pattern_cache: Dict[str, List[WorkflowPattern]] = {} self.performance_history: Dict[str, List[float]] = {} self.optimization_lock = asyncio.Lock() # ML models (simplified for Phase 3) self.bottleneck_detector = None self.pattern_matcher = None self.performance_predictor = None logger.info("WorkflowOptimizer initialized") async def register_workflow(self, workflow: WorkflowDefinition) -> str: """Register a new workflow for optimization""" workflow_id = workflow.workflow_id # Analyze workflow patterns workflow.patterns = await self._analyze_patterns(workflow) # Set default optimization strategies if not provided if not workflow.optimization_strategies: workflow.optimization_strategies = [ OptimizationStrategy.REORDER, OptimizationStrategy.PARALLELIZE, OptimizationStrategy.CACHE ] self.workflows[workflow_id] = workflow self.performance_history[workflow_id] = [] logger.info(f"Registered workflow: {workflow.name} ({workflow_id})") return workflow_id async def execute_workflow( self, workflow_id: str, context: Optional[Dict[str, Any]] = None ) -> WorkflowExecution: """Execute a workflow with optimization""" if workflow_id not in self.workflows: raise ValueError(f"Workflow {workflow_id} not found") workflow = self.workflows[workflow_id] execution = WorkflowExecution(workflow_id=workflow_id) try: # Generate optimized execution plan execution.execution_plan, execution.parallel_groups = await self._generate_execution_plan(workflow) # Execute the workflow await self._execute_workflow_steps(execution, workflow, context or {}) # Record execution execution.ended_at = datetime.now() execution.total_time = (execution.ended_at - execution.started_at).total_seconds() execution.status = "completed" # Update workflow statistics await self._update_workflow_stats(workflow_id, execution) # Check if optimization is needed if workflow.auto_optimize and workflow.total_executions % workflow.optimization_frequency == 0: asyncio.create_task(self._optimize_workflow(workflow_id)) except Exception as e: execution.status = "failed" execution.error_message = str(e) logger.error(f"Workflow execution failed: {e}") self.executions[execution.execution_id] = execution return execution async def _generate_execution_plan( self, workflow: WorkflowDefinition ) -> Tuple[List[str], List[List[str]]]: """Generate optimized execution plan""" steps = {step.step_id: step for step in workflow.steps} # Build dependency graph dependencies = {step_id: step.depends_on for step_id, step in steps.items()} # Topological sort for basic ordering plan = self._topological_sort(dependencies) # Identify parallelizable groups parallel_groups = self._identify_parallel_groups(plan, dependencies, steps) # Apply optimizations if OptimizationStrategy.REORDER in workflow.optimization_strategies: plan = await self._optimize_step_order(plan, steps) return plan, parallel_groups def _topological_sort(self, dependencies: Dict[str, List[str]]) -> List[str]: """Topological sort of workflow steps using Kahn's algorithm Args: dependencies: Dict where key = step_id, value = list of step_ids it depends on Returns: List of step_ids in topological order (dependencies first) """ # Initialize in-degree for all nodes to 0 all_nodes = set(dependencies.keys()) for deps in dependencies.values(): all_nodes.update(deps) in_degree = {node: 0 for node in all_nodes} # Calculate in-degree: count how many dependencies each node has for node, deps in dependencies.items(): for dep in deps: in_degree[node] += 1 # node depends on dep # Start with nodes that have no dependencies (in-degree = 0) queue = [node for node in all_nodes if in_degree[node] == 0] result = [] while queue: # Sort queue for deterministic results queue.sort() node = queue.pop(0) result.append(node) # Find nodes that depend on this node and reduce their in-degree for other_node, deps in dependencies.items(): if node in deps: in_degree[other_node] -= 1 if in_degree[other_node] == 0: queue.append(other_node) # Check if we have a valid topological order (no cycles) if len(result) != len(all_nodes): raise ValueError("Cycle detected in workflow dependencies") return result def _identify_parallel_groups( self, plan: List[str], dependencies: Dict[str, List[str]], steps: Dict[str, WorkflowStep] ) -> List[List[str]]: """Identify groups of steps that can run in parallel""" groups = [] remaining = set(plan) while remaining: # Find steps with no unmet dependencies in remaining set ready = [] for step_id in remaining: deps = set(dependencies[step_id]) if deps.isdisjoint(remaining): ready.append(step_id) # Filter by parallelization capability and resource limits parallel_ready = [ step_id for step_id in ready if steps[step_id].can_parallelize ] if parallel_ready: groups.append(parallel_ready) remaining -= set(parallel_ready) else: # No parallelizable steps ready, execute single step groups.append([ready[0]]) remaining.remove(ready[0]) return groups async def _optimize_step_order( self, plan: List[str], steps: Dict[str, WorkflowStep] ) -> List[str]: """Optimize step ordering based on performance data""" # Simple heuristic: prioritize faster, more reliable steps step_scores = { step_id: ( steps[step_id].success_rate * 0.6 + (1.0 / (1.0 + steps[step_id].avg_execution_time)) * 0.4 ) for step_id in plan } # Sort within dependency constraints optimized_plan = [] remaining = set(plan) while remaining: # Find steps with all dependencies satisfied ready = [ step_id for step_id in remaining if all(dep in optimized_plan for dep in steps[step_id].depends_on) ] if not ready: # Should not happen with valid DAG ready = list(remaining) # Sort by score (highest first) ready.sort(key=lambda x: step_scores[x], reverse=True) # Add best step optimized_plan.append(ready[0]) remaining.remove(ready[0]) return optimized_plan async def _execute_workflow_steps( self, execution: WorkflowExecution, workflow: WorkflowDefinition, context: Dict[str, Any] ): """Execute workflow steps according to plan""" steps = {step.step_id: step for step in workflow.steps} step_start_times = {} for group in execution.parallel_groups: if len(group) == 1: # Sequential execution step_id = group[0] step_start_times[step_id] = datetime.now() await self._execute_single_step(step_id, steps[step_id], execution, context) else: # Parallel execution tasks = [] for step_id in group: step_start_times[step_id] = datetime.now() task = asyncio.create_task( self._execute_single_step(step_id, steps[step_id], execution, context) ) tasks.append(task) await asyncio.gather(*tasks, return_exceptions=True) # Calculate step timings for step_id in execution.step_results: if step_id in step_start_times: duration = (datetime.now() - step_start_times[step_id]).total_seconds() execution.step_timings[step_id] = duration async def _execute_single_step( self, step_id: str, step: WorkflowStep, execution: WorkflowExecution, context: Dict[str, Any] ): """Execute a single workflow step""" try: # Simulate step execution (in real implementation, this would call the capability) execution_time = step.avg_execution_time * (0.8 + 0.4 * hash(step_id) % 10 / 10) await asyncio.sleep(min(execution_time, 0.1)) # Cap for demo # Record result execution.step_results[step_id] = {"status": "success", "data": f"Result of {step_id}"} except Exception as e: execution.step_errors[step_id] = str(e) execution.step_results[step_id] = {"status": "error", "error": str(e)} async def _update_workflow_stats(self, workflow_id: str, execution: WorkflowExecution): """Update workflow statistics after execution""" if workflow_id not in self.workflows: return workflow = self.workflows[workflow_id] workflow.total_executions += 1 # Update execution time if workflow.total_executions == 1: workflow.avg_execution_time = execution.total_time else: workflow.avg_execution_time = ( (workflow.avg_execution_time * (workflow.total_executions - 1) + execution.total_time) / workflow.total_executions ) # Update success rate if execution.status == "completed": workflow.success_rate = ( (workflow.success_rate * (workflow.total_executions - 1) + 1.0) / workflow.total_executions ) else: workflow.success_rate = ( (workflow.success_rate * (workflow.total_executions - 1)) / workflow.total_executions ) workflow.updated_at = datetime.now() # Track performance history self.performance_history[workflow_id].append(execution.total_time) if len(self.performance_history[workflow_id]) > 1000: self.performance_history[workflow_id] = self.performance_history[workflow_id][-500:] async def _analyze_patterns(self, workflow: WorkflowDefinition) -> List[WorkflowPattern]: """Analyze workflow patterns""" patterns = [] steps = workflow.steps # Check for sequential pattern if all(len(step.depends_on) <= 1 and len(step.outputs_to) <= 1 for step in steps): patterns.append(WorkflowPattern.SEQUENTIAL) # Check for parallel pattern parallel_steps = sum(1 for step in steps if step.can_parallelize) if parallel_steps > 1: patterns.append(WorkflowPattern.PARALLEL) # Check for pipeline pattern if len(steps) > 2 and all( len(step.depends_on) == 1 and len(step.outputs_to) == 1 for step in steps[1:-1] ): patterns.append(WorkflowPattern.PIPELINE) # Check for fan-out pattern fan_out_steps = sum(1 for step in steps if len(step.outputs_to) > 1) if fan_out_steps > 0: patterns.append(WorkflowPattern.FAN_OUT) # Check for reduce pattern reduce_steps = sum(1 for step in steps if len(step.depends_on) > 1) if reduce_steps > 0: patterns.append(WorkflowPattern.REDUCE) return patterns async def _optimize_workflow(self, workflow_id: str): """Optimize a workflow based on performance data""" async with self.optimization_lock: if workflow_id not in self.workflows: return workflow = self.workflows[workflow_id] # Analyze performance and identify optimization opportunities proposals = await self._generate_optimization_proposals(workflow_id) # Apply high-confidence proposals for proposal in proposals: if proposal.confidence_score > 0.8 and proposal.implementation_complexity == "low": await self._apply_optimization(proposal) async def _generate_optimization_proposals(self, workflow_id: str) -> List[OptimizationProposal]: """Generate optimization proposals for a workflow""" proposals = [] workflow = self.workflows[workflow_id] if workflow_id not in self.performance_history or len(self.performance_history[workflow_id]) < 10: return proposals performance_data = self.performance_history[workflow_id] recent_performance = performance_data[-10:] avg_recent = sum(recent_performance) / len(recent_performance) # Analyze bottlenecks if avg_recent > workflow.avg_execution_time * 1.2: # Performance degradation detected proposal = await self._create_parallelization_proposal(workflow) if proposal: proposals.append(proposal) # Analyze execution patterns if workflow.total_executions > 50: cache_proposal = await self._create_caching_proposal(workflow) if cache_proposal: proposals.append(cache_proposal) return proposals async def _create_parallelization_proposal(self, workflow: WorkflowDefinition) -> Optional[OptimizationProposal]: """Create proposal for parallelization optimization""" # Find sequential steps that could be parallelized sequential_steps = [ step for step in workflow.steps if not step.can_parallelize and len(step.depends_on) <= 1 ] if len(sequential_steps) < 2: return None # Create proposal proposal = OptimizationProposal( workflow_id=workflow.workflow_id, strategy=OptimizationStrategy.PARALLELIZE, target_steps=[step.step_id for step in sequential_steps[:2]], # Start with 2 steps confidence_score=0.7, expected_time_reduction=15.0, # 15% improvement expected implementation_complexity="medium" ) proposal.reasoning = "Sequential steps identified that can be parallelized to improve throughput" proposal.supporting_data = [ f"Found {len(sequential_steps)} sequential steps", f"Current execution time: {workflow.avg_execution_time:.2f}s" ] return proposal async def _create_caching_proposal(self, workflow: WorkflowDefinition) -> Optional[OptimizationProposal]: """Create proposal for caching optimization""" # Find steps with repeatable results cacheable_steps = [ step for step in workflow.steps if not step.can_cache and step.success_rate > 0.95 ] if not cacheable_steps: return None proposal = OptimizationProposal( workflow_id=workflow.workflow_id, strategy=OptimizationStrategy.CACHE, target_steps=[step.step_id for step in cacheable_steps[:1]], confidence_score=0.8, expected_time_reduction=10.0, implementation_complexity="low" ) proposal.reasoning = "High-success steps identified as candidates for result caching" proposal.supporting_data = [ f"Found {len(cacheable_steps)} cacheable steps", f"Average success rate: {sum(s.success_rate for s in cacheable_steps) / len(cacheable_steps):.2%}" ] return proposal async def _apply_optimization(self, proposal: OptimizationProposal): """Apply an optimization proposal""" if proposal.workflow_id not in self.workflows: return workflow = self.workflows[proposal.workflow_id] try: if proposal.strategy == OptimizationStrategy.PARALLELIZE: # Enable parallelization for target steps for step in workflow.steps: if step.step_id in proposal.target_steps: step.can_parallelize = True elif proposal.strategy == OptimizationStrategy.CACHE: # Enable caching for target steps for step in workflow.steps: if step.step_id in proposal.target_steps: step.can_cache = True step.cache_ttl = 300 # Update proposal status proposal.status = "applied" proposal.applied_at = datetime.now() workflow.last_optimized = datetime.now() # Store proposal self.proposals[proposal.proposal_id] = proposal logger.info(f"Applied optimization {proposal.proposal_id} to workflow {workflow.name}") except Exception as e: proposal.status = "failed" logger.error(f"Failed to apply optimization {proposal.proposal_id}: {e}") async def get_workflow_analytics(self, workflow_id: str) -> Dict[str, Any]: """Get comprehensive analytics for a workflow""" if workflow_id not in self.workflows: return {} workflow = self.workflows[workflow_id] executions = [ exec for exec in self.executions.values() if exec.workflow_id == workflow_id ] # Calculate analytics analytics = { "workflow": { "id": workflow.workflow_id, "name": workflow.name, "version": workflow.version, "total_executions": workflow.total_executions, "success_rate": workflow.success_rate, "avg_execution_time": workflow.avg_execution_time, "patterns": [p.value for p in workflow.patterns], "last_optimized": workflow.last_optimized.isoformat() if workflow.last_optimized else None }, "performance": { "recent_executions": len(executions[-10:]) if executions else 0, "performance_trend": self._calculate_performance_trend(workflow_id), "bottlenecks": await self._identify_bottlenecks(workflow_id), "optimization_opportunities": await self._get_optimization_opportunities(workflow_id) }, "optimizations": { "proposals_generated": len(self.proposals), "proposals_applied": len([p for p in self.proposals.values() if p.status == "applied"]), "last_optimization": workflow.last_optimized.isoformat() if workflow.last_optimized else None } } return analytics def _calculate_performance_trend(self, workflow_id: str) -> str: """Calculate performance trend for a workflow""" if workflow_id not in self.performance_history or len(self.performance_history[workflow_id]) < 10: return "insufficient_data" history = self.performance_history[workflow_id] recent = history[-5:] older = history[-10:-5] recent_avg = sum(recent) / len(recent) older_avg = sum(older) / len(older) if recent_avg < older_avg * 0.9: return "improving" elif recent_avg > older_avg * 1.1: return "degrading" else: return "stable" async def _identify_bottlenecks(self, workflow_id: str) -> List[str]: """Identify bottlenecks in a workflow""" bottlenecks = [] if workflow_id not in self.workflows: return bottlenecks workflow = self.workflows[workflow_id] # Find slow steps slow_steps = [ step.step_name for step in workflow.steps if step.avg_execution_time > workflow.avg_execution_time / len(workflow.steps) * 1.5 ] if slow_steps: bottlenecks.append(f"Slow steps: {', '.join(slow_steps)}") # Find unreliable steps unreliable_steps = [ step.step_name for step in workflow.steps if step.success_rate < 0.9 ] if unreliable_steps: bottlenecks.append(f"Unreliable steps: {', '.join(unreliable_steps)}") return bottlenecks async def _get_optimization_opportunities(self, workflow_id: str) -> List[str]: """Get optimization opportunities for a workflow""" opportunities = [] if workflow_id not in self.workflows: return opportunities workflow = self.workflows[workflow_id] # Check for parallelization opportunities sequential_steps = [ step for step in workflow.steps if not step.can_parallelize and len(step.depends_on) <= 1 ] if len(sequential_steps) > 1: opportunities.append(f"Can parallelize {len(sequential_steps)} sequential steps") # Check for caching opportunities non_cacheable_steps = [ step for step in workflow.steps if not step.can_cache and step.success_rate > 0.95 ] if non_cacheable_steps: opportunities.append(f"Can cache {len(non_cacheable_steps)} high-success steps") return opportunities async def get_optimization_proposals(self, workflow_id: Optional[str] = None) -> List[OptimizationProposal]: """Get optimization proposals""" if workflow_id: return [p for p in self.proposals.values() if p.workflow_id == workflow_id] return list(self.proposals.values()) async def approve_proposal(self, proposal_id: str, reviewer: str) -> bool: """Approve an optimization proposal""" if proposal_id not in self.proposals: return False proposal = self.proposals[proposal_id] proposal.status = "approved" proposal.reviewed_by = reviewer proposal.reviewed_at = datetime.now() # Apply the optimization await self._apply_optimization(proposal) return True async def reject_proposal(self, proposal_id: str, reviewer: str, reason: str) -> bool: """Reject an optimization proposal""" if proposal_id not in self.proposals: return False proposal = self.proposals[proposal_id] proposal.status = "rejected" proposal.reviewed_by = reviewer proposal.reviewed_at = datetime.now() proposal.metadata["rejection_reason"] = reason return True

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ciphernaut/katamari-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server