"""
Complete LangGraph Workflow Engine for Multi-Agent Orchestration.
Implements sophisticated workflow patterns with:
- State-based agent coordination
- Dynamic workflow routing
- Error handling and recovery
- Memory persistence integration
- Real-time monitoring
"""
import asyncio
from typing import Dict, List, Any, Optional, TypedDict, Annotated
from datetime import datetime
import logging
from enum import Enum
from langgraph.graph import StateGraph, END
from langgraph.prebuilt import ToolExecutor
from langgraph.checkpoint.sqlite import SqliteSaver
from ..models.agent_models import AgentConfig, AgentMessage, AgentResponse
from ..models.workflow_models import WorkflowState, UserIntent, AgentTask
from ..services.groq_service import GroqLLMService
from ..services.memory_service import MemoryService
from ..agents.orchestrator import UltraOrchestratorAgent
from ..agents.architect_agent import ArchitectAgent
from ..agents.codebase_memory_agent import CodebaseMemoryAgent
from ..agents.debug_refactor_agent import DebugRefactorAgent
logger = logging.getLogger(__name__)
class WorkflowStep(str, Enum):
"""Workflow execution steps."""
INTENT_ANALYSIS = "intent_analysis"
AGENT_ROUTING = "agent_routing"
ARCHITECT_PLANNING = "architect_planning"
CODEBASE_ANALYSIS = "codebase_analysis"
DEBUG_ANALYSIS = "debug_analysis"
TASK_BREAKDOWN = "task_breakdown"
DOCUMENT_RESEARCH = "document_research"
EXECUTION_MONITORING = "execution_monitoring"
RESULT_SYNTHESIS = "result_synthesis"
class WorkflowContext(TypedDict):
"""Complete workflow context with state management."""
# Input and routing
user_query: str
user_intent: Optional[UserIntent]
workflow_type: str
active_agent: Optional[str]
# Agent responses
orchestrator_response: Optional[Dict[str, Any]]
architect_response: Optional[Dict[str, Any]]
codebase_response: Optional[Dict[str, Any]]
debug_response: Optional[Dict[str, Any]]
task_response: Optional[Dict[str, Any]]
document_response: Optional[Dict[str, Any]]
# State management
current_step: str
completed_steps: List[str]
pending_steps: List[str]
error_state: Optional[str]
# Memory and context
session_id: str
project_context: Dict[str, Any]
memory_queries: List[Dict[str, Any]]
cross_layer_relationships: List[Dict[str, Any]]
# Results and output
final_response: Optional[Dict[str, Any]]
generated_tasks: List[str]
created_relationships: List[str]
# Metadata
workflow_id: str
started_at: datetime
updated_at: datetime
execution_metrics: Dict[str, Any]
class AgentWorkflowEngine:
"""
Complete LangGraph-based workflow engine for multi-agent coordination.
Provides sophisticated workflow orchestration with:
- Dynamic agent routing based on context
- State persistence and recovery
- Cross-layer memory integration
- Real-time monitoring and metrics
- Error handling and retry mechanisms
"""
def __init__(
self,
llm_service: GroqLLMService,
memory_service: MemoryService,
codebase_path: Optional[str] = None
):
self.llm = llm_service
self.memory = memory_service
self.codebase_path = codebase_path
# Initialize agents
self.agents = {}
self.workflow_graph = None
self.checkpointer = SqliteSaver.from_conn_string(":memory:")
# Workflow metrics
self.execution_metrics = {
"total_workflows": 0,
"successful_workflows": 0,
"failed_workflows": 0,
"average_execution_time": 0.0,
"agent_usage": {}
}
async def initialize(self) -> bool:
"""Initialize the workflow engine and all agents."""
try:
# Initialize agents
await self._initialize_agents()
# Build workflow graph
self._build_workflow_graph()
logger.info("Agent workflow engine initialized successfully")
return True
except Exception as e:
logger.error(f"Failed to initialize workflow engine: {e}")
return False
async def _initialize_agents(self):
"""Initialize all specialized agents."""
try:
# Ultra Orchestrator Agent
orchestrator_config = AgentConfig(
agent_id="ultra_orchestrator",
agent_type="orchestrator",
capabilities=["intent_analysis", "workflow_routing", "context_management"],
max_concurrent_tasks=10
)
self.agents["orchestrator"] = UltraOrchestratorAgent(
orchestrator_config, self.llm, self.memory
)
# Architect Agent
architect_config = AgentConfig(
agent_id="architect_agent",
agent_type="architect",
capabilities=["design_planning", "architecture_optimization", "tech_stack_analysis"],
max_concurrent_tasks=5
)
self.agents["architect"] = ArchitectAgent(
architect_config, self.llm, self.memory
)
# Codebase Memory Agent
if self.codebase_path:
codebase_config = AgentConfig(
agent_id="codebase_memory_agent",
agent_type="codebase_memory",
capabilities=["code_indexing", "real_time_monitoring", "relationship_mapping"],
max_concurrent_tasks=3
)
self.agents["codebase"] = CodebaseMemoryAgent(
codebase_config, self.llm, self.memory, self.codebase_path
)
# Debug Refactor Agent
debug_config = AgentConfig(
agent_id="debug_refactor_agent",
agent_type="debug_refactor",
capabilities=["code_analysis", "bug_detection", "refactoring_plans"],
max_concurrent_tasks=3
)
self.agents["debug"] = DebugRefactorAgent(
debug_config, self.llm, self.memory
)
# Initialize all agents
for agent_name, agent in self.agents.items():
await agent.initialize({})
logger.info(f"Initialized {agent_name} agent")
except Exception as e:
logger.error(f"Failed to initialize agents: {e}")
raise
def _build_workflow_graph(self):
"""Build the LangGraph workflow with all states and transitions."""
try:
# Create workflow graph
workflow = StateGraph(WorkflowContext)
# Add nodes for each workflow step
workflow.add_node(WorkflowStep.INTENT_ANALYSIS, self._intent_analysis_node)
workflow.add_node(WorkflowStep.AGENT_ROUTING, self._agent_routing_node)
workflow.add_node(WorkflowStep.ARCHITECT_PLANNING, self._architect_planning_node)
workflow.add_node(WorkflowStep.CODEBASE_ANALYSIS, self._codebase_analysis_node)
workflow.add_node(WorkflowStep.DEBUG_ANALYSIS, self._debug_analysis_node)
workflow.add_node(WorkflowStep.TASK_BREAKDOWN, self._task_breakdown_node)
workflow.add_node(WorkflowStep.DOCUMENT_RESEARCH, self._document_research_node)
workflow.add_node(WorkflowStep.EXECUTION_MONITORING, self._execution_monitoring_node)
workflow.add_node(WorkflowStep.RESULT_SYNTHESIS, self._result_synthesis_node)
# Define workflow edges and conditional routing
workflow.set_entry_point(WorkflowStep.INTENT_ANALYSIS)
# Intent Analysis -> Agent Routing (always)
workflow.add_edge(WorkflowStep.INTENT_ANALYSIS, WorkflowStep.AGENT_ROUTING)
# Agent Routing -> Conditional routing based on intent
workflow.add_conditional_edges(
WorkflowStep.AGENT_ROUTING,
self._route_to_agent,
{
"architect": WorkflowStep.ARCHITECT_PLANNING,
"codebase": WorkflowStep.CODEBASE_ANALYSIS,
"debug": WorkflowStep.DEBUG_ANALYSIS,
"task": WorkflowStep.TASK_BREAKDOWN,
"document": WorkflowStep.DOCUMENT_RESEARCH,
"synthesis": WorkflowStep.RESULT_SYNTHESIS
}
)
# All agent nodes -> Execution Monitoring
workflow.add_edge(WorkflowStep.ARCHITECT_PLANNING, WorkflowStep.EXECUTION_MONITORING)
workflow.add_edge(WorkflowStep.CODEBASE_ANALYSIS, WorkflowStep.EXECUTION_MONITORING)
workflow.add_edge(WorkflowStep.DEBUG_ANALYSIS, WorkflowStep.EXECUTION_MONITORING)
workflow.add_edge(WorkflowStep.TASK_BREAKDOWN, WorkflowStep.EXECUTION_MONITORING)
workflow.add_edge(WorkflowStep.DOCUMENT_RESEARCH, WorkflowStep.EXECUTION_MONITORING)
# Execution Monitoring -> Result Synthesis or back to Agent Routing
workflow.add_conditional_edges(
WorkflowStep.EXECUTION_MONITORING,
self._check_completion,
{
"continue": WorkflowStep.AGENT_ROUTING,
"synthesize": WorkflowStep.RESULT_SYNTHESIS,
"end": END
}
)
# Result Synthesis -> END
workflow.add_edge(WorkflowStep.RESULT_SYNTHESIS, END)
# Compile workflow with checkpointing
self.workflow_graph = workflow.compile(checkpointer=self.checkpointer)
logger.info("Workflow graph built successfully")
except Exception as e:
logger.error(f"Failed to build workflow graph: {e}")
raise
# ================================
# WORKFLOW EXECUTION
# ================================
async def execute_workflow(
self,
user_query: str,
session_id: str,
project_context: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""Execute complete workflow for user query."""
workflow_id = f"workflow_{datetime.utcnow().isoformat()}"
start_time = datetime.utcnow()
try:
# Initialize workflow context
initial_context = WorkflowContext(
user_query=user_query,
user_intent=None,
workflow_type="user_query",
active_agent=None,
orchestrator_response=None,
architect_response=None,
codebase_response=None,
debug_response=None,
task_response=None,
document_response=None,
current_step=WorkflowStep.INTENT_ANALYSIS,
completed_steps=[],
pending_steps=[],
error_state=None,
session_id=session_id,
project_context=project_context or {},
memory_queries=[],
cross_layer_relationships=[],
final_response=None,
generated_tasks=[],
created_relationships=[],
workflow_id=workflow_id,
started_at=start_time,
updated_at=start_time,
execution_metrics={}
)
# Execute workflow
final_state = None
async for state in self.workflow_graph.astream(
initial_context,
config={"configurable": {"thread_id": workflow_id}}
):
final_state = state
logger.debug(f"Workflow step completed: {state.get('current_step')}")
# Calculate execution metrics
end_time = datetime.utcnow()
execution_time = (end_time - start_time).total_seconds()
# Update metrics
self._update_execution_metrics(execution_time, True)
# Return final results
result = {
"workflow_id": workflow_id,
"success": True,
"execution_time": execution_time,
"final_response": final_state.get("final_response") if final_state else None,
"generated_tasks": final_state.get("generated_tasks", []) if final_state else [],
"created_relationships": final_state.get("created_relationships", []) if final_state else [],
"completed_steps": final_state.get("completed_steps", []) if final_state else []
}
logger.info(f"Workflow {workflow_id} completed successfully in {execution_time:.2f}s")
return result
except Exception as e:
logger.error(f"Workflow {workflow_id} failed: {e}")
self._update_execution_metrics(0, False)
return {
"workflow_id": workflow_id,
"success": False,
"error": str(e),
"execution_time": (datetime.utcnow() - start_time).total_seconds()
}
# ================================
# WORKFLOW NODES
# ================================
async def _intent_analysis_node(self, state: WorkflowContext) -> WorkflowContext:
"""Analyze user intent and set workflow direction."""
try:
message = AgentMessage(
content=state["user_query"],
sender_id="user",
message_type="query",
metadata={"session_id": state["session_id"]}
)
response = await self.agents["orchestrator"].analyze_user_intent(message)
if response.success:
state["user_intent"] = response.data.get("intent")
state["orchestrator_response"] = response.data
state["workflow_type"] = response.data.get("workflow_type", "general")
else:
state["error_state"] = f"Intent analysis failed: {response.error}"
state["current_step"] = WorkflowStep.AGENT_ROUTING
state["completed_steps"].append(WorkflowStep.INTENT_ANALYSIS)
state["updated_at"] = datetime.utcnow()
return state
except Exception as e:
logger.error(f"Intent analysis node failed: {e}")
state["error_state"] = f"Intent analysis error: {str(e)}"
return state
async def _agent_routing_node(self, state: WorkflowContext) -> WorkflowContext:
"""Route to appropriate specialized agent based on intent."""
try:
intent = state.get("user_intent")
if not intent:
state["active_agent"] = "orchestrator"
state["current_step"] = WorkflowStep.RESULT_SYNTHESIS
return state
# Determine which agent should handle this intent
if intent.intent_type in ["architecture", "design", "planning"]:
state["active_agent"] = "architect"
state["current_step"] = WorkflowStep.ARCHITECT_PLANNING
elif intent.intent_type in ["codebase", "code_analysis", "indexing"]:
state["active_agent"] = "codebase"
state["current_step"] = WorkflowStep.CODEBASE_ANALYSIS
elif intent.intent_type in ["debug", "refactor", "code_quality"]:
state["active_agent"] = "debug"
state["current_step"] = WorkflowStep.DEBUG_ANALYSIS
elif intent.intent_type in ["task", "breakdown", "project_management"]:
state["current_step"] = WorkflowStep.TASK_BREAKDOWN
elif intent.intent_type in ["documentation", "research"]:
state["current_step"] = WorkflowStep.DOCUMENT_RESEARCH
else:
state["current_step"] = WorkflowStep.RESULT_SYNTHESIS
state["completed_steps"].append(WorkflowStep.AGENT_ROUTING)
state["updated_at"] = datetime.utcnow()
return state
except Exception as e:
logger.error(f"Agent routing node failed: {e}")
state["error_state"] = f"Agent routing error: {str(e)}"
return state
async def _architect_planning_node(self, state: WorkflowContext) -> WorkflowContext:
"""Execute architect agent planning workflow."""
try:
if "architect" not in self.agents:
state["error_state"] = "Architect agent not available"
return state
message = AgentMessage(
content=state["user_query"],
sender_id="orchestrator",
message_type="design_request",
metadata={
"session_id": state["session_id"],
"user_intent": state.get("user_intent"),
"project_context": state.get("project_context", {})
}
)
response = await self.agents["architect"].process_design_request(message)
if response.success:
state["architect_response"] = response.data
# Extract generated tasks
if "grand_plan_id" in response.data:
state["generated_tasks"].append(response.data["grand_plan_id"])
else:
state["error_state"] = f"Architect planning failed: {response.error}"
state["completed_steps"].append(WorkflowStep.ARCHITECT_PLANNING)
state["current_step"] = WorkflowStep.EXECUTION_MONITORING
state["updated_at"] = datetime.utcnow()
return state
except Exception as e:
logger.error(f"Architect planning node failed: {e}")
state["error_state"] = f"Architect planning error: {str(e)}"
return state
async def _codebase_analysis_node(self, state: WorkflowContext) -> WorkflowContext:
"""Execute codebase analysis workflow."""
try:
if "codebase" not in self.agents:
state["error_state"] = "Codebase agent not available"
return state
# Determine analysis type based on intent
analysis_type = "incremental"
if state.get("user_intent") and "full" in state["user_intent"].parameters:
analysis_type = "full"
# Execute codebase analysis
stats = await self.agents["codebase"].index_codebase()
state["codebase_response"] = {
"analysis_type": analysis_type,
"statistics": stats,
"indexed_files": stats.get("files_processed", 0)
}
state["completed_steps"].append(WorkflowStep.CODEBASE_ANALYSIS)
state["current_step"] = WorkflowStep.EXECUTION_MONITORING
state["updated_at"] = datetime.utcnow()
return state
except Exception as e:
logger.error(f"Codebase analysis node failed: {e}")
state["error_state"] = f"Codebase analysis error: {str(e)}"
return state
async def _debug_analysis_node(self, state: WorkflowContext) -> WorkflowContext:
"""Execute debug and refactor analysis workflow."""
try:
if "debug" not in self.agents:
state["error_state"] = "Debug agent not available"
return state
# Execute debug analysis
response = await self.agents["debug"].analyze_codebase("incremental")
if response.success:
state["debug_response"] = response.data
# Track refactoring plans as tasks
plans_created = response.data.get("refactoring_plans", 0)
for i in range(plans_created):
state["generated_tasks"].append(f"refactoring_plan_{i}")
else:
state["error_state"] = f"Debug analysis failed: {response.error}"
state["completed_steps"].append(WorkflowStep.DEBUG_ANALYSIS)
state["current_step"] = WorkflowStep.EXECUTION_MONITORING
state["updated_at"] = datetime.utcnow()
return state
except Exception as e:
logger.error(f"Debug analysis node failed: {e}")
state["error_state"] = f"Debug analysis error: {str(e)}"
return state
async def _task_breakdown_node(self, state: WorkflowContext) -> WorkflowContext:
"""Execute task breakdown workflow."""
try:
# Use orchestrator for task breakdown
message = AgentMessage(
content=f"Break down the following into tasks: {state['user_query']}",
sender_id="workflow",
message_type="task_breakdown",
metadata={"session_id": state["session_id"]}
)
response = await self.agents["orchestrator"].process_message(message)
if response.success:
state["task_response"] = response.data
else:
state["error_state"] = f"Task breakdown failed: {response.error}"
state["completed_steps"].append(WorkflowStep.TASK_BREAKDOWN)
state["current_step"] = WorkflowStep.EXECUTION_MONITORING
state["updated_at"] = datetime.utcnow()
return state
except Exception as e:
logger.error(f"Task breakdown node failed: {e}")
state["error_state"] = f"Task breakdown error: {str(e)}"
return state
async def _document_research_node(self, state: WorkflowContext) -> WorkflowContext:
"""Execute document research workflow."""
try:
# Use memory service for document search
from ..models.memory_layers import MemoryQuery
query = MemoryQuery(
query_text=state["user_query"],
target_layers=["documents"],
max_results_per_layer=10
)
results = await self.memory.hybrid_search(query)
state["document_response"] = {
"documents_found": len(results.document_results),
"search_results": results.document_results
}
state["completed_steps"].append(WorkflowStep.DOCUMENT_RESEARCH)
state["current_step"] = WorkflowStep.EXECUTION_MONITORING
state["updated_at"] = datetime.utcnow()
return state
except Exception as e:
logger.error(f"Document research node failed: {e}")
state["error_state"] = f"Document research error: {str(e)}"
return state
async def _execution_monitoring_node(self, state: WorkflowContext) -> WorkflowContext:
"""Monitor execution and determine next steps."""
try:
# Check if we have enough information to synthesize results
responses = [
state.get("orchestrator_response"),
state.get("architect_response"),
state.get("codebase_response"),
state.get("debug_response"),
state.get("task_response"),
state.get("document_response")
]
active_responses = [r for r in responses if r is not None]
# Determine if we need more agent involvement
intent = state.get("user_intent")
if intent and len(active_responses) < 2:
# Need more information - route back to agents
state["current_step"] = WorkflowStep.AGENT_ROUTING
else:
# Ready to synthesize
state["current_step"] = WorkflowStep.RESULT_SYNTHESIS
state["completed_steps"].append(WorkflowStep.EXECUTION_MONITORING)
state["updated_at"] = datetime.utcnow()
return state
except Exception as e:
logger.error(f"Execution monitoring node failed: {e}")
state["error_state"] = f"Execution monitoring error: {str(e)}"
return state
async def _result_synthesis_node(self, state: WorkflowContext) -> WorkflowContext:
"""Synthesize results from all agents into final response."""
try:
# Collect all agent responses
agent_responses = {
"orchestrator": state.get("orchestrator_response"),
"architect": state.get("architect_response"),
"codebase": state.get("codebase_response"),
"debug": state.get("debug_response"),
"task": state.get("task_response"),
"document": state.get("document_response")
}
# Filter out None responses
active_responses = {k: v for k, v in agent_responses.items() if v is not None}
# Use orchestrator to synthesize final response
synthesis_message = AgentMessage(
content=f"Synthesize the following agent responses for user query: '{state['user_query']}'",
sender_id="workflow",
message_type="synthesis",
metadata={
"session_id": state["session_id"],
"agent_responses": active_responses,
"generated_tasks": state.get("generated_tasks", [])
}
)
response = await self.agents["orchestrator"].synthesize_response(synthesis_message)
if response.success:
state["final_response"] = response.data
else:
state["final_response"] = {
"message": "Workflow completed with partial results",
"agent_responses": active_responses,
"error": response.error
}
state["completed_steps"].append(WorkflowStep.RESULT_SYNTHESIS)
state["updated_at"] = datetime.utcnow()
return state
except Exception as e:
logger.error(f"Result synthesis node failed: {e}")
state["error_state"] = f"Result synthesis error: {str(e)}"
return state
# ================================
# WORKFLOW ROUTING LOGIC
# ================================
def _route_to_agent(self, state: WorkflowContext) -> str:
"""Route workflow to appropriate agent based on current state."""
try:
current_step = state.get("current_step")
active_agent = state.get("active_agent")
if current_step == WorkflowStep.ARCHITECT_PLANNING:
return "architect"
elif current_step == WorkflowStep.CODEBASE_ANALYSIS:
return "codebase"
elif current_step == WorkflowStep.DEBUG_ANALYSIS:
return "debug"
elif current_step == WorkflowStep.TASK_BREAKDOWN:
return "task"
elif current_step == WorkflowStep.DOCUMENT_RESEARCH:
return "document"
else:
return "synthesis"
except Exception as e:
logger.error(f"Agent routing failed: {e}")
return "synthesis"
def _check_completion(self, state: WorkflowContext) -> str:
"""Check if workflow should continue, synthesize, or end."""
try:
error_state = state.get("error_state")
if error_state:
return "end"
completed_steps = state.get("completed_steps", [])
# If we have multiple agent responses, synthesize
responses = [
state.get("architect_response"),
state.get("codebase_response"),
state.get("debug_response"),
state.get("task_response"),
state.get("document_response")
]
active_responses = [r for r in responses if r is not None]
if len(active_responses) >= 1:
return "synthesize"
elif len(completed_steps) > 8: # Prevent infinite loops
return "synthesize"
else:
return "continue"
except Exception as e:
logger.error(f"Completion check failed: {e}")
return "end"
# ================================
# METRICS AND MONITORING
# ================================
def _update_execution_metrics(self, execution_time: float, success: bool):
"""Update workflow execution metrics."""
try:
self.execution_metrics["total_workflows"] += 1
if success:
self.execution_metrics["successful_workflows"] += 1
else:
self.execution_metrics["failed_workflows"] += 1
# Update average execution time
total = self.execution_metrics["total_workflows"]
current_avg = self.execution_metrics["average_execution_time"]
self.execution_metrics["average_execution_time"] = (
(current_avg * (total - 1) + execution_time) / total
)
except Exception as e:
logger.error(f"Failed to update execution metrics: {e}")
def get_workflow_metrics(self) -> Dict[str, Any]:
"""Get comprehensive workflow execution metrics."""
return {
**self.execution_metrics,
"success_rate": (
self.execution_metrics["successful_workflows"] /
max(self.execution_metrics["total_workflows"], 1)
),
"agent_availability": {
name: agent is not None
for name, agent in self.agents.items()
}
}
# ================================
# WORKFLOW MANAGEMENT
# ================================
async def get_workflow_status(self, workflow_id: str) -> Dict[str, Any]:
"""Get status of a running workflow."""
try:
# This would query the checkpointer for workflow state
# For now, return basic status
return {
"workflow_id": workflow_id,
"status": "running",
"message": "Workflow status retrieval not fully implemented"
}
except Exception as e:
logger.error(f"Failed to get workflow status: {e}")
return {"error": str(e)}
async def cancel_workflow(self, workflow_id: str) -> bool:
"""Cancel a running workflow."""
try:
# This would implement workflow cancellation
logger.info(f"Workflow cancellation requested: {workflow_id}")
return True
except Exception as e:
logger.error(f"Failed to cancel workflow: {e}")
return False