---
description: AGNO framework, Python, workflow orchestration, and stateful multi-agent systems
globs:
alwaysApply: false
---
> You are an expert in AGNO framework, Python, workflow orchestration, and stateful multi-agent systems. You focus on building robust, production-ready workflows with proper state management, error handling, and agent coordination for complex business processes.
## AGNO Workflows Architecture Flow
```
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐
│ Workflow Design │ │ Agent Orchestra │ │ Execution │
│ - Multi-Step │───▶│ - Coordination │───▶│ - Streaming │
│ - State Mgmt │ │ - Specialization│ │ - Progress │
│ - Error Handling│ │ - Flow Control │ │ - Results │
└─────────────────┘ └──────────────────┘ └─────────────────┘
│ │ │
▼ ▼ ▼
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐
│ Storage Layer │ │ Validation │ │ Production │
│ - Session State│ │ - Input Schema │ │ - Persistence │
│ - Persistence │ │ - Error Recovery│ │ - Monitoring │
│ - Caching │ │ - Type Safety │ │ - Scalability │
└─────────────────┘ └──────────────────┘ └─────────────────┘
```
# AGNO Workflows Guidelines
Workflows in AGNO are deterministic, stateful multi-agent programs designed for production applications. They offer the flexibility of pure Python with the power of agentic systems, perfect for complex, multi-step processes.
## Core Workflow Concepts
1. **Pure Python Logic**: Use standard Python control flow (if/else, loops, exceptions)
2. **Stateful Execution**: Built-in storage for caching and state management
3. **Full Control and Flexibility**: Manage the entire workflow process
4. **Agent Orchestration**: Coordinate multiple agents to complete complex tasks
## Basic Workflow Implementation
```python
from typing import Iterator
from agno.agent import Agent, RunResponse
from agno.models.openai import OpenAIChat
from agno.workflow import Workflow
class SimpleWorkflow(Workflow):
# Class description (for documentation)
description: str = "A simple workflow demonstration"
# Define agent(s) as class attributes
agent = Agent(model=OpenAIChat(id="gpt-4o"))
# Main workflow logic in the run method
def run(self, message: str) -> Iterator[RunResponse]:
# Process the input
processed_message = f"Process this request: {message}"
# Run the agent and yield responses (streaming)
yield from self.agent.run(processed_message, stream=True)
# Instantiate and run the workflow
if __name__ == "__main__":
workflow = SimpleWorkflow()
response = workflow.run("Explain quantum computing")
# Print or process the response iterator
```
## Workflow with State Management
State management is a key feature of workflows, allowing you to cache results and track progress:
```python
from typing import Iterator
from agno.agent import Agent, RunResponse
from agno.models.openai import OpenAIChat
from agno.workflow import Workflow
class CacheWorkflow(Workflow):
description: str = "A workflow that caches previous outputs"
agent = Agent(model=OpenAIChat(id="gpt-4o"))
def run(self, message: str) -> Iterator[RunResponse]:
# Check if the output is already cached
if self.session_state.get(message):
# Return cached result if available
yield RunResponse(
run_id=self.run_id,
content=self.session_state.get(message)
)
return
# If not cached, run the agent
yield from self.agent.run(message, stream=True)
# Cache the output after response is yielded
self.session_state[message] = self.agent.run_response.content
```
## Multi-Step Workflow
Workflows excel at orchestrating complex multi-step processes with multiple agents:
```python
from typing import Iterator, Dict, Any
from agno.agent import Agent, RunResponse
from agno.models.anthropic import Claude
from agno.tools.duckduckgo import DuckDuckGoTools
from agno.tools.newspaper4k import Newspaper4kTools
from agno.workflow import Workflow
class ResearchWorkflow(Workflow):
description: str = "A research workflow that searches the web, extracts content, and summarizes findings"
# Define multiple agents with different responsibilities
search_agent = Agent(
name="SearchAgent",
model=Claude(id="claude-3-5-sonnet-latest"),
tools=[DuckDuckGoTools()],
instructions=["Find the most relevant resources about the topic"],
)
content_agent = Agent(
name="ContentAgent",
model=Claude(id="claude-3-5-sonnet-latest"),
tools=[Newspaper4kTools()],
instructions=["Extract and summarize the main points from articles"],
)
summary_agent = Agent(
name="SummaryAgent",
model=Claude(id="claude-3-7-sonnet-latest"),
instructions=[
"Synthesize information into a comprehensive summary",
"Include all key points and maintain factual accuracy",
"Format the summary in markdown with clear sections",
],
)
def run(self, topic: str) -> Iterator[RunResponse]:
# Step 1: Search for relevant information
yield RunResponse(run_id=self.run_id, content=f"🔍 Searching for information on: {topic}...")
search_response = self.search_agent.run(f"Find recent and reliable information about: {topic}")
# Extract URLs from search results
urls = self._extract_urls_from_response(search_response.content)
if not urls:
yield RunResponse(run_id=self.run_id, content="⚠️ No relevant sources found.")
return
# Store urls in session state for future reference
self.session_state["urls"] = urls
# Step 2: Extract content from each URL
yield RunResponse(run_id=self.run_id, content=f"📄 Extracting content from {len(urls)} sources...")
articles = []
for url in urls[:3]: # Limit to top 3 sources for efficiency
content_prompt = f"Extract and summarize the key information from this article: {url}"
content_response = self.content_agent.run(content_prompt)
articles.append(content_response.content)
# Store articles in session state
self.session_state["articles"] = articles
# Step 3: Generate final summary
yield RunResponse(run_id=self.run_id, content="✍️ Generating comprehensive summary...")
summary_prompt = f"""
Create a detailed summary on the topic: {topic}
Based on these sources:
{articles}
Provide a well-structured summary with proper citations.
"""
# Stream the final summary
yield from self.summary_agent.run(summary_prompt, stream=True)
def _extract_urls_from_response(self, response: str) -> list[str]:
"""Helper method to extract URLs from search results"""
# Implementation details...
return ["http://example.com/1", "http://example.com/2"] # Simplified example
```
## Error Handling and Validation
Proper error handling and validation are critical for production workflows:
```python
from typing import Iterator, Optional
from pydantic import BaseModel, ValidationError
from agno.agent import Agent, RunResponse
from agno.models.anthropic import Claude
from agno.workflow import Workflow
class UserInput(BaseModel):
name: str
email: str
query: str
class ValidationWorkflow(Workflow):
description: str = "A workflow with input validation and error handling"
agent = Agent(model=Claude(id="claude-3-7-sonnet-latest"))
def run(self, input_data: dict) -> Iterator[RunResponse]:
# Validate input with Pydantic
try:
user_input = UserInput(**input_data)
except ValidationError as e:
yield RunResponse(
run_id=self.run_id,
content=f"⚠️ Input validation error: {str(e)}"
)
return
# Process with error handling
try:
# Personalized response using validated input
prompt = f"Hello {user_input.name}, here's the answer to your query: {user_input.query}"
yield from self.agent.run(prompt, stream=True)
# Store successful interaction
self._log_interaction(user_input)
except Exception as e:
yield RunResponse(
run_id=self.run_id,
content=f"❌ Error processing request: {str(e)}"
)
# Attempt error recovery if possible
if self._can_recover(e):
yield from self._recovery_process(user_input)
def _log_interaction(self, user_input: UserInput) -> None:
"""Log successful interactions"""
if "interactions" not in self.session_state:
self.session_state["interactions"] = []
self.session_state["interactions"].append({
"email": user_input.email,
"query": user_input.query,
"timestamp": datetime.now().isoformat()
})
def _can_recover(self, error: Exception) -> bool:
"""Check if error is recoverable"""
# Implementation details...
return False
def _recovery_process(self, user_input: UserInput) -> Iterator[RunResponse]:
"""Recovery process for errors"""
# Implementation details...
yield RunResponse(run_id=self.run_id, content="Attempting recovery...")
```
## Workflow Best Practices
1. **State Management**:
- Use `self.session_state` to cache intermediate results
- Structure state data with descriptive keys
- Clear state when no longer needed to prevent bloat
2. **Error Handling**:
- Implement try/except blocks around external API calls
- Validate inputs early in the workflow
- Provide informative error messages and recovery paths
3. **Workflow Design**:
- Break complex tasks into smaller functions
- Use descriptive variable and function names
- Document workflow purpose and key steps
4. **Performance Optimization**:
- Cache expensive operations
- Use streaming responses with `yield from agent.run(prompt, stream=True)`
- Consider parallel processing for independent steps
5. **Agent Management**:
- Create specialized agents for different tasks
- Set appropriate models based on task complexity
- Provide clear, specific instructions to each agent
## Storage and Persistence
For production workflows, configure external storage to maintain state across runs:
```python
from agno.workflow import Workflow
from agno.workflow.storage.sqlite import SqliteWorkflowStorage
# Configure workflow with persistent storage
class PersistentWorkflow(Workflow):
# Configure storage during initialization
def __init__(self):
super().__init__(
# Persist state in SQLite database
storage=SqliteWorkflowStorage(
db_file="data/workflows.db",
table_name="workflow_sessions"
)
)
# Run method implementation
def run(self, input_data: dict) -> Iterator[RunResponse]:
# State will be loaded from database if session_id exists
# ...implementation...
# State will be saved to database after completion
pass
```