example_workflow.pyโข2.84 kB
"""Example workflow using delegation MCP."""
import asyncio
from pathlib import Path
from delegation_mcp import DelegationConfig, OrchestratorRegistry, DelegationEngine
async def main():
"""Run example delegation workflow."""
# Load configuration
config_path = Path("config/delegation_rules.yaml")
config = DelegationConfig.from_yaml(config_path)
# Setup registry and engine
registry = OrchestratorRegistry()
engine = DelegationEngine(config, registry)
# Register orchestrators
for name, orch_config in config.orchestrators.items():
registry.register(orch_config)
print("=== Delegation MCP Example Workflow ===\n")
# Example 1: Security audit (should delegate to Gemini)
print("Example 1: Security Audit")
result1 = await engine.process("Run a security audit on authentication module")
print(f" Primary: {result1.orchestrator}")
print(f" Delegated to: {result1.delegated_to}")
print(f" Rule matched: {result1.rule.pattern if result1.rule else 'None'}")
print(f" Success: {result1.success}\n")
# Example 2: Refactoring (should delegate to Aider)
print("Example 2: Refactoring")
result2 = await engine.process("Refactor the database connection code")
print(f" Primary: {result2.orchestrator}")
print(f" Delegated to: {result2.delegated_to}")
print(f" Rule matched: {result2.rule.pattern if result2.rule else 'None'}")
print(f" Success: {result2.success}\n")
# Example 3: Pull request (should delegate to Copilot if enabled)
print("Example 3: Pull Request")
result3 = await engine.process("Create a pull request for the new feature")
print(f" Primary: {result3.orchestrator}")
print(f" Delegated to: {result3.delegated_to}")
print(f" Rule matched: {result3.rule.pattern if result3.rule else 'None'}")
print(f" Success: {result3.success}\n")
# Example 4: No rule match (uses primary orchestrator)
print("Example 4: No Rule Match")
result4 = await engine.process("Explain how async/await works in Python")
print(f" Primary: {result4.orchestrator}")
print(f" Delegated to: {result4.delegated_to}")
print(f" Rule matched: {result4.rule.pattern if result4.rule else 'None'}")
print(f" Success: {result4.success}\n")
# Show statistics
print("=== Statistics ===")
stats = engine.get_statistics()
print(f"Total queries: {stats['total']}")
print(f"Delegations: {stats['delegations']}")
print(f"Delegation rate: {stats['delegation_rate']:.1f}%")
print(f"Success rate: {stats['success_rate']:.1f}%")
print(f"Avg duration: {stats['avg_duration']:.2f}s")
print(f"\nBy orchestrator:")
for orch, count in stats['by_orchestrator'].items():
print(f" {orch}: {count}")
if __name__ == "__main__":
asyncio.run(main())