workflow_runner_demo.py•14.8 kB
"""
Workflow Runner Tool Demonstration.
This example demonstrates how to use the WorkflowRunnerTool to execute
different workflows with various parameters and configurations.
"""
import asyncio
import logging
import os
from dotenv import load_dotenv
load_dotenv()  # Load environment variables from .env file
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Mock FastMCP Context for demonstration
class MockContext:
    """Mock context for demonstration purposes."""
    def __init__(self):
        self.progress_updates = []
        self.info_messages = []
        self.error_messages = []
    async def report_progress(self, progress: int, total: int = 100):
        """Mock progress reporting."""
        self.progress_updates.append((progress, total))
        logger.info(f"Progress: {progress}/{total} ({progress/total*100:.1f}%)")
    async def info(self, message: str):
        """Mock info message."""
        self.info_messages.append(message)
        logger.info(f"INFO: {message}")
    async def error(self, message: str):
        """Mock error message."""
        self.error_messages.append(message)
        logger.error(f"ERROR: {message}")
async def demonstrate_workflow_runner():
    """Demonstrate the WorkflowRunnerTool capabilities."""
    print("=" * 80)
    print("WORKFLOW RUNNER TOOL DEMONSTRATION")
    print("=" * 80)
    # Check environment setup
    if not os.getenv("OPENAI_API_KEY"):
        print("⚠️  OPENAI_API_KEY not set. This is required for the WorkflowRunnerTool.")
        print("   Set it in your .env file or environment variables.")
        return
    try:
        # Import the workflow runner tool
        from src.tools.workflows.workflow_runner import WorkflowRunnerTool
        print("✅ WorkflowRunnerTool imported successfully")
        # Create the tool instance
        print("\n🔧 Initializing WorkflowRunnerTool...")
        workflow_runner = WorkflowRunnerTool("workflow_runner", "workflows")
        print("✅ WorkflowRunnerTool initialized")
        # Create mock context
        ctx = MockContext()
        # Example 1: Execute missing data troubleshooting workflow
        print("\n" + "=" * 60)
        print("EXAMPLE 1: Missing Data Troubleshooting Workflow")
        print("=" * 60)
        try:
            result1 = await workflow_runner.execute(
                ctx=ctx,
                workflow_id="missing_data_troubleshooting",
                problem_description="Dashboard shows no data for the last 2 hours",
                earliest_time="-2h",
                latest_time="now",
                focus_index="main",
                complexity_level="moderate",
                enable_summarization=True,
            )
            print(f"✅ Workflow Status: {result1['status']}")
            print(f"📊 Workflow: {result1['workflow_name']}")
            print(
                f"⏱️  Execution Time: {result1['execution_metadata']['total_execution_time']:.2f}s"
            )
            print(f"🔄 Tasks: {result1['workflow_execution']['total_tasks']}")
            print(f"✅ Successful: {result1['workflow_execution']['successful_tasks']}")
            print(f"❌ Failed: {result1['workflow_execution']['failed_tasks']}")
            if result1["summarization"]["enabled"]:
                print(
                    f"🧠 Summarization: Completed in {result1['summarization']['execution_time']:.2f}s"
                )
        except Exception as e:
            print(f"❌ Example 1 failed: {e}")
        # Example 2: Execute performance analysis workflow
        print("\n" + "=" * 60)
        print("EXAMPLE 2: Performance Analysis Workflow")
        print("=" * 60)
        try:
            result2 = await workflow_runner.execute(
                ctx=ctx,
                workflow_id="performance_analysis",
                problem_description="Searches are running very slowly since yesterday",
                earliest_time="-24h",
                latest_time="now",
                focus_host="search-head-01",
                complexity_level="advanced",
                enable_summarization=True,
            )
            print(f"✅ Workflow Status: {result2['status']}")
            print(f"📊 Workflow: {result2['workflow_name']}")
            print(
                f"⏱️  Execution Time: {result2['execution_metadata']['total_execution_time']:.2f}s"
            )
            print(f"🔄 Tasks: {result2['workflow_execution']['total_tasks']}")
            print(
                f"⚡ Parallel Efficiency: {result2['workflow_execution']['parallel_efficiency']:.1%}"
            )
        except Exception as e:
            print(f"❌ Example 2 failed: {e}")
        # Example 3: Execute workflow without summarization
        print("\n" + "=" * 60)
        print("EXAMPLE 3: Workflow Execution Without Summarization")
        print("=" * 60)
        try:
            result3 = await workflow_runner.execute(
                ctx=ctx,
                workflow_id="missing_data_troubleshooting",
                problem_description="Quick check for data availability",
                complexity_level="basic",
                enable_summarization=False,  # Disable summarization for faster execution
            )
            print(f"✅ Workflow Status: {result3['status']}")
            print(f"🧠 Summarization: {result3['summarization']['enabled']}")
            print(
                f"⏱️  Execution Time: {result3['execution_metadata']['total_execution_time']:.2f}s"
            )
        except Exception as e:
            print(f"❌ Example 3 failed: {e}")
        # Example 4: Handle workflow not found error
        print("\n" + "=" * 60)
        print("EXAMPLE 4: Error Handling - Workflow Not Found")
        print("=" * 60)
        try:
            result4 = await workflow_runner.execute(
                ctx=ctx, workflow_id="non_existent_workflow", complexity_level="moderate"
            )
            if result4["status"] == "error":
                print(f"❌ Expected Error: {result4['error']}")
                print(f"📋 Available Workflows: {', '.join(result4['available_workflows'])}")
        except Exception as e:
            print(f"❌ Example 4 failed: {e}")
        # Example 5: Demonstrate parameter flexibility
        print("\n" + "=" * 60)
        print("EXAMPLE 5: Parameter Flexibility")
        print("=" * 60)
        try:
            result5 = await workflow_runner.execute(
                ctx=ctx,
                workflow_id="performance_analysis",
                problem_description="Comprehensive system analysis",
                earliest_time="-7d",  # Week-long analysis
                latest_time="now",
                focus_index="security",  # Focus on security index
                focus_host="indexer-03",  # Specific indexer
                focus_sourcetype="syslog",  # Specific sourcetype
                complexity_level="advanced",
                enable_summarization=True,
            )
            print(f"✅ Workflow Status: {result5['status']}")
            print("🎯 Focus Areas:")
            print(f"   - Index: {result5['diagnostic_context']['focus_index']}")
            print(f"   - Host: {result5['diagnostic_context']['focus_host']}")
            print(f"   - Sourcetype: {result5['diagnostic_context']['focus_sourcetype']}")
            print(
                f"   - Time Range: {result5['diagnostic_context']['earliest_time']} to {result5['diagnostic_context']['latest_time']}"
            )
            print(f"   - Complexity: {result5['diagnostic_context']['complexity_level']}")
        except Exception as e:
            print(f"❌ Example 5 failed: {e}")
        print("\n" + "=" * 80)
        print("WORKFLOW RUNNER DEMONSTRATION COMPLETED")
        print("=" * 80)
        # Summary
        print("\n📋 SUMMARY:")
        print(f"   - Total progress updates: {len(ctx.progress_updates)}")
        print(f"   - Total info messages: {len(ctx.info_messages)}")
        print(f"   - Total error messages: {len(ctx.error_messages)}")
        if ctx.info_messages:
            print("\n💡 Recent info messages:")
            for msg in ctx.info_messages[-3:]:  # Show last 3 messages
                print(f"   - {msg}")
    except ImportError as e:
        print(f"❌ Failed to import WorkflowRunnerTool: {e}")
        print("   Make sure the tool is properly installed and dependencies are met.")
    except Exception as e:
        print(f"❌ Demonstration failed: {e}")
        import traceback
        traceback.print_exc()
async def demonstrate_workflow_comparison():
    """Demonstrate comparing different workflow executions."""
    print("\n" + "=" * 80)
    print("WORKFLOW COMPARISON DEMONSTRATION")
    print("=" * 80)
    try:
        from src.tools.workflows.list_workflows import ListWorkflowsTool
        from src.tools.workflows.workflow_runner import WorkflowRunnerTool
        # Create tools
        workflow_runner = WorkflowRunnerTool("workflow_runner", "workflows")
        workflow_lister = ListWorkflowsTool("list_workflows", "workflows")
        ctx = MockContext()
        # First, list available workflows
        print("📋 Listing available workflows...")
        workflows_list = await workflow_lister.execute(ctx=ctx, format_type="summary")
        if workflows_list.get("status") == "success":
            workflows = workflows_list["data"]["workflows"]
            print(f"✅ Found {len(workflows)} workflows:")
            for workflow_id, info in workflows.items():
                print(f"   - {workflow_id}: {info['name']} ({info['task_count']} tasks)")
        # Compare execution times for different complexity levels
        print("\n⚡ Comparing execution with different complexity levels...")
        complexity_levels = ["basic", "moderate", "advanced"]
        results = {}
        for complexity in complexity_levels:
            try:
                print(f"\n🔄 Running with complexity: {complexity}")
                result = await workflow_runner.execute(
                    ctx=ctx,
                    workflow_id="missing_data_troubleshooting",
                    problem_description=f"Test run with {complexity} complexity",
                    complexity_level=complexity,
                    enable_summarization=False,  # Disable for fair comparison
                )
                results[complexity] = {
                    "status": result["status"],
                    "execution_time": result["execution_metadata"]["total_execution_time"],
                    "tasks": result["workflow_execution"]["total_tasks"],
                    "successful": result["workflow_execution"]["successful_tasks"],
                }
                print(
                    f"   ✅ {complexity}: {result['status']} in {result['execution_metadata']['total_execution_time']:.2f}s"
                )
            except Exception as e:
                print(f"   ❌ {complexity}: Failed - {e}")
                results[complexity] = {"status": "error", "error": str(e)}
        # Display comparison
        print("\n📊 EXECUTION COMPARISON:")
        print(f"{'Complexity':<12} {'Status':<10} {'Time (s)':<10} {'Tasks':<8} {'Success':<8}")
        print("-" * 60)
        for complexity, data in results.items():
            if data["status"] != "error":
                print(
                    f"{complexity:<12} {data['status']:<10} {data['execution_time']:<10.2f} {data['tasks']:<8} {data['successful']:<8}"
                )
            else:
                print(f"{complexity:<12} {'ERROR':<10} {'N/A':<10} {'N/A':<8} {'N/A':<8}")
    except Exception as e:
        print(f"❌ Workflow comparison failed: {e}")
def print_usage_examples():
    """Print usage examples for the WorkflowRunnerTool."""
    print("\n" + "=" * 80)
    print("WORKFLOW RUNNER USAGE EXAMPLES")
    print("=" * 80)
    examples = [
        {
            "title": "Basic Missing Data Analysis",
            "code": """
# Execute missing data troubleshooting workflow
result = await workflow_runner.execute(
    ctx=ctx,
    workflow_id="missing_data_troubleshooting",
    problem_description="Dashboard shows no data",
    earliest_time="-24h",
    latest_time="now"
)
            """,
            "description": "Run the standard missing data troubleshooting workflow",
        },
        {
            "title": "Performance Analysis with Focus",
            "code": """
# Execute performance analysis for specific host
result = await workflow_runner.execute(
    ctx=ctx,
    workflow_id="performance_analysis",
    problem_description="Slow searches on indexer-01",
    focus_host="indexer-01",
    complexity_level="advanced",
    enable_summarization=True
)
            """,
            "description": "Run performance analysis focused on a specific host",
        },
        {
            "title": "Quick Basic Analysis",
            "code": """
# Quick basic analysis without summarization
result = await workflow_runner.execute(
    ctx=ctx,
    workflow_id="missing_data_troubleshooting",
    complexity_level="basic",
    enable_summarization=False
)
            """,
            "description": "Fast execution with basic complexity and no summarization",
        },
        {
            "title": "Targeted Index Analysis",
            "code": """
# Analyze specific index and sourcetype
result = await workflow_runner.execute(
    ctx=ctx,
    workflow_id="performance_analysis",
    focus_index="security",
    focus_sourcetype="firewall",
    earliest_time="-7d",
    complexity_level="moderate"
)
            """,
            "description": "Focus analysis on specific index and sourcetype",
        },
    ]
    for i, example in enumerate(examples, 1):
        print(f"\n{i}. {example['title']}")
        print(f"   {example['description']}")
        print(f"   ```python{example['code']}   ```")
if __name__ == "__main__":
    """Main demonstration runner."""
    print("🚀 Starting Workflow Runner Tool Demonstration")
    # Run the main demonstration
    asyncio.run(demonstrate_workflow_runner())
    # Run workflow comparison
    asyncio.run(demonstrate_workflow_comparison())
    # Print usage examples
    print_usage_examples()
    print("\n✅ Demonstration completed!")
    print("\n💡 To use the WorkflowRunnerTool in your own code:")
    print("   1. Import: from src.tools.workflows.workflow_runner import WorkflowRunnerTool")
    print("   2. Initialize: tool = WorkflowRunnerTool('workflow_runner', 'workflows')")
    print("   3. Execute: result = await tool.execute(ctx=context, workflow_id='your_workflow')")
    print("   4. Use list_workflows tool to discover available workflow IDs")