#!/usr/bin/env python3
"""
Advanced Agentic Workflow Patterns for Claude Code MCP Integration
Implements multi-agent orchestration with specialized roles and MCP server coordination
"""
import asyncio
import json
import subprocess
import time
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any
from dataclasses import dataclass, asdict
from pathlib import Path
import logging
# Configure logging
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
@dataclass
class AgentConfig:
"""Configuration for individual agents"""
name: str
role: str
tmux_window: str
mcp_servers: List[str]
specialization: str
priority: int = 1
@dataclass
class WorkflowTask:
"""Represents a workflow task with agent coordination"""
id: str
title: str
description: str
agent_assignments: Dict[str, str]
dependencies: List[str]
estimated_duration: int # minutes
status: str = "pending"
created_at: datetime = None
updated_at: datetime = None
def __post_init__(self):
if self.created_at is None:
self.created_at = datetime.now()
if self.updated_at is None:
self.updated_at = datetime.now()
class AgenticOrchestrator:
"""Advanced multi-agent orchestrator for Claude Code workflows"""
def __init__(self, session_name: str = "agentic-workflow"):
self.session_name = session_name
self.agents = {}
self.active_workflows = {}
self.task_queue = []
self.setup_default_agents()
def setup_default_agents(self):
"""Initialize default agent configurations"""
default_agents = [
AgentConfig(
name="research_agent",
role="Research Specialist",
tmux_window="research",
mcp_servers=["zotero", "google-scholar"],
specialization="Academic research, paper analysis, knowledge synthesis",
priority=1,
),
AgentConfig(
name="development_agent",
role="Senior Developer",
tmux_window="development",
mcp_servers=["filesystem", "sequential-thinking"],
specialization="Code implementation, architecture decisions, refactoring",
priority=1,
),
AgentConfig(
name="testing_agent",
role="QA Engineer",
tmux_window="testing",
mcp_servers=["playwright-ms", "playwright-community"],
specialization="Automated testing, web scraping, browser automation",
priority=2,
),
AgentConfig(
name="integration_agent",
role="DevOps Specialist",
tmux_window="integration",
mcp_servers=["filesystem", "sequential-thinking"],
specialization="CI/CD, deployment, system integration",
priority=2,
),
AgentConfig(
name="orchestrator_agent",
role="Project Coordinator",
tmux_window="orchestrator",
mcp_servers=["elevenlabs", "filesystem"],
specialization="Task coordination, progress tracking, team communication",
priority=0,
),
]
for agent in default_agents:
self.agents[agent.name] = agent
async def initialize_session(self) -> bool:
"""Initialize tmux session with all agent windows"""
logger.info(f"Initializing agentic session: {self.session_name}")
try:
# Check if session exists
result = subprocess.run(
["tmux", "has-session", "-t", self.session_name], capture_output=True, text=True
)
if result.returncode == 0:
logger.warning(f"Session {self.session_name} already exists")
return True
# Create new session
subprocess.run(
["tmux", "new-session", "-d", "-s", self.session_name, "-n", "orchestrator"]
)
# Create windows for each agent
for agent_name, agent in self.agents.items():
if agent_name != "orchestrator_agent": # orchestrator uses default window
subprocess.run(
["tmux", "new-window", "-t", self.session_name, "-n", agent.tmux_window]
)
# Initialize Claude Code in each window
target = f"{self.session_name}:{agent.tmux_window}"
subprocess.run(["tmux", "send-keys", "-t", target, "claude", "Enter"])
# Send initialization message
init_message = f"# {agent.role} - {agent.specialization}"
subprocess.run(["tmux", "send-keys", "-t", target, init_message, "Enter"])
if agent.mcp_servers:
server_message = f"# Available MCP servers: {', '.join(agent.mcp_servers)}"
subprocess.run(["tmux", "send-keys", "-t", target, server_message, "Enter"])
logger.info(f"Session initialized with {len(self.agents)} agent windows")
return True
except Exception as e:
logger.error(f"Failed to initialize session: {e}")
return False
async def create_workflow(self, workflow_spec: Dict[str, Any]) -> str:
"""Create a new multi-agent workflow"""
workflow_id = f"workflow_{int(time.time())}"
# Parse workflow specification
tasks = []
for task_spec in workflow_spec.get("tasks", []):
task = WorkflowTask(
id=f"task_{len(tasks)}",
title=task_spec["title"],
description=task_spec["description"],
agent_assignments=task_spec.get("agents", {}),
dependencies=task_spec.get("dependencies", []),
estimated_duration=task_spec.get("duration", 30),
)
tasks.append(task)
self.active_workflows[workflow_id] = {
"id": workflow_id,
"title": workflow_spec["title"],
"description": workflow_spec.get("description", ""),
"tasks": tasks,
"status": "active",
"created_at": datetime.now(),
}
logger.info(f"Created workflow {workflow_id} with {len(tasks)} tasks")
# Start workflow execution
await self.execute_workflow(workflow_id)
return workflow_id
async def execute_workflow(self, workflow_id: str):
"""Execute workflow tasks with agent coordination"""
workflow = self.active_workflows.get(workflow_id)
if not workflow:
logger.error(f"Workflow {workflow_id} not found")
return
logger.info(f"Executing workflow: {workflow['title']}")
# Send orchestration message
orchestrator_message = f"""
WORKFLOW INITIATED: {workflow['title']}
Total Tasks: {len(workflow['tasks'])}
Description: {workflow['description']}
Starting task coordination across {len(self.agents)} specialized agents.
Each agent will receive specific instructions based on their role and available MCP servers.
"""
await self.send_agent_message("orchestrator_agent", orchestrator_message)
# Process tasks in dependency order
for task in workflow["tasks"]:
await self.execute_task(task, workflow_id)
async def execute_task(self, task: WorkflowTask, workflow_id: str):
"""Execute individual task with appropriate agent"""
logger.info(f"Executing task: {task.title}")
# Determine best agent for this task
assigned_agents = task.agent_assignments
if not assigned_agents:
assigned_agents = await self.auto_assign_agents(task)
# Send task to assigned agents
for agent_name, instructions in assigned_agents.items():
if agent_name in self.agents:
task_message = f"""
TASK ASSIGNMENT: {task.title}
Description: {task.description}
Estimated Duration: {task.estimated_duration} minutes
Workflow: {workflow_id}
Specific Instructions:
{instructions}
Please begin work and provide regular progress updates.
Use your available MCP servers as needed for this task.
"""
await self.send_agent_message(agent_name, task_message)
# Schedule progress check
await self.schedule_progress_check(task, workflow_id)
async def auto_assign_agents(self, task: WorkflowTask) -> Dict[str, str]:
"""Automatically assign agents based on task requirements"""
assignments = {}
# Simple heuristics for agent assignment
task_lower = task.description.lower()
if any(keyword in task_lower for keyword in ["research", "paper", "study", "analysis"]):
assignments[
"research_agent"
] = "Conduct comprehensive research using Zotero and Google Scholar integration"
if any(keyword in task_lower for keyword in ["implement", "code", "develop", "build"]):
assignments[
"development_agent"
] = "Implement the solution following best practices and architectural guidelines"
if any(keyword in task_lower for keyword in ["test", "verify", "validate", "automation"]):
assignments[
"testing_agent"
] = "Create and execute comprehensive tests using Playwright automation"
if any(keyword in task_lower for keyword in ["deploy", "integrate", "setup", "configure"]):
assignments[
"integration_agent"
] = "Handle deployment and system integration requirements"
# Ensure orchestrator is always involved for coordination
assignments["orchestrator_agent"] = "Monitor task progress and coordinate between agents"
return assignments
async def send_agent_message(self, agent_name: str, message: str):
"""Send message to specific agent via tmux"""
if agent_name not in self.agents:
logger.error(f"Agent {agent_name} not found")
return
agent = self.agents[agent_name]
target = f"{self.session_name}:{agent.tmux_window}"
try:
# Send the message
subprocess.run(["tmux", "send-keys", "-t", target, message])
await asyncio.sleep(0.5)
subprocess.run(["tmux", "send-keys", "-t", target, "Enter"])
logger.info(f"Message sent to {agent.role} ({agent_name})")
except Exception as e:
logger.error(f"Failed to send message to {agent_name}: {e}")
async def schedule_progress_check(self, task: WorkflowTask, workflow_id: str):
"""Schedule automated progress check"""
check_interval = min(task.estimated_duration // 2, 15) # Check halfway or max 15 min
async def progress_checker():
await asyncio.sleep(check_interval * 60) # Convert to seconds
progress_message = f"""
PROGRESS CHECK: {task.title}
Please provide a status update on your current progress:
- What has been completed?
- What are you currently working on?
- Any blockers or issues?
- Estimated time remaining?
This is an automated check scheduled every {check_interval} minutes.
"""
for agent_name in task.agent_assignments:
await self.send_agent_message(agent_name, progress_message)
# Start progress checker as background task
asyncio.create_task(progress_checker())
async def status_report(self) -> Dict[str, Any]:
"""Generate comprehensive status report"""
report = {
"session": self.session_name,
"timestamp": datetime.now().isoformat(),
"agents": {name: asdict(config) for name, config in self.agents.items()},
"active_workflows": len(self.active_workflows),
"total_tasks": sum(len(wf["tasks"]) for wf in self.active_workflows.values()),
}
# Check MCP server status
try:
result = subprocess.run(["claude", "mcp", "list"], capture_output=True, text=True)
report["mcp_status"] = result.stdout
except Exception as e:
report["mcp_status"] = f"Error checking MCP status: {e}"
return report
def save_state(self, filepath: str = None):
"""Save orchestrator state to file"""
if filepath is None:
filepath = f"orchestrator_state_{int(time.time())}.json"
state = {
"session_name": self.session_name,
"agents": {name: asdict(config) for name, config in self.agents.items()},
"active_workflows": {},
}
# Serialize workflows (handle datetime objects)
for wf_id, workflow in self.active_workflows.items():
serializable_workflow = workflow.copy()
serializable_workflow["created_at"] = workflow["created_at"].isoformat()
serializable_workflow["tasks"] = []
for task in workflow["tasks"]:
task_dict = asdict(task)
task_dict["created_at"] = task.created_at.isoformat()
task_dict["updated_at"] = task.updated_at.isoformat()
serializable_workflow["tasks"].append(task_dict)
state["active_workflows"][wf_id] = serializable_workflow
with open(filepath, "w") as f:
json.dump(state, f, indent=2)
logger.info(f"State saved to {filepath}")
# Example workflow patterns
RESEARCH_WORKFLOW = {
"title": "Research-Driven Development Workflow",
"description": "End-to-end workflow combining academic research with practical implementation",
"tasks": [
{
"title": "Literature Review",
"description": "Conduct comprehensive research on the topic using academic databases and existing knowledge",
"agents": {
"research_agent": "Use Zotero and Google Scholar to find relevant papers and synthesize findings"
},
"duration": 45,
},
{
"title": "Technical Architecture",
"description": "Design technical approach based on research findings",
"agents": {
"development_agent": "Create architecture and implementation plan based on research insights"
},
"dependencies": ["Literature Review"],
"duration": 30,
},
{
"title": "Implementation",
"description": "Implement the solution following the designed architecture",
"agents": {"development_agent": "Code the solution with comprehensive documentation"},
"dependencies": ["Technical Architecture"],
"duration": 90,
},
{
"title": "Testing & Validation",
"description": "Comprehensive testing including automated browser testing",
"agents": {
"testing_agent": "Create and execute test suites using Playwright automation"
},
"dependencies": ["Implementation"],
"duration": 60,
},
],
}
AGENTIC_DEVELOPMENT_WORKFLOW = {
"title": "Multi-Agent Development Workflow",
"description": "Coordinated development workflow with specialized agent roles",
"tasks": [
{
"title": "Requirements Analysis",
"description": "Analyze requirements and create development plan",
"agents": {
"orchestrator_agent": "Coordinate requirements gathering and create task breakdown"
},
"duration": 30,
},
{
"title": "Parallel Development",
"description": "Multiple agents work on different components simultaneously",
"agents": {
"development_agent": "Implement core functionality",
"testing_agent": "Create test framework and initial tests",
"integration_agent": "Setup deployment pipeline",
},
"dependencies": ["Requirements Analysis"],
"duration": 120,
},
{
"title": "Integration & Testing",
"description": "Integrate components and run comprehensive tests",
"agents": {
"testing_agent": "Execute full test suite with browser automation",
"integration_agent": "Deploy and validate integration",
},
"dependencies": ["Parallel Development"],
"duration": 45,
},
],
}
async def main():
"""Demo of orchestrator capabilities"""
orchestrator = AgenticOrchestrator()
# Initialize session
await orchestrator.initialize_session()
# Create a research workflow
workflow_id = await orchestrator.create_workflow(RESEARCH_WORKFLOW)
# Wait a bit and generate status report
await asyncio.sleep(5)
report = await orchestrator.status_report()
print(json.dumps(report, indent=2))
# Save state
orchestrator.save_state()
if __name__ == "__main__":
asyncio.run(main())