crewai_routing_tutorial.ipynb•11.1 kB
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"<center>\n",
" <p style=\"text-align:center\">\n",
" <img alt=\"phoenix logo\" src=\"https://raw.githubusercontent.com/Arize-ai/phoenix-assets/9e6101d95936f4bd4d390efc9ce646dc6937fb2d/images/socal/github-large-banner-phoenix.jpg\" width=\"1000\"/>\n",
" <br>\n",
" <br>\n",
" <a href=\"https://arize.com/docs/phoenix/\">Docs</a>\n",
" |\n",
" <a href=\"https://github.com/Arize-ai/phoenix\">GitHub</a>\n",
" |\n",
" <a href=\"https://arize-ai.slack.com/join/shared_invite/zt-2w57bhem8-hq24MB6u7yE_ZF_ilOYSBw#/shared-invite/email\">Community</a>\n",
" </p>\n",
"</center>\n",
"<h1 align=\"center\">Tracing CrewAI with Arize Phoenix - Routing Workflow</h1>"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%pip install -q arize-phoenix opentelemetry-sdk opentelemetry-exporter-otlp crewai crewai_tools openinference-instrumentation-crewai"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "5-gPdVmIndw9"
},
"source": [
"## Set up Keys and Dependencies"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Note: For this colab you'll need:\n",
"\n",
"* OpenAI API key (https://openai.com/)\n",
"* Serper API key (https://serper.dev/)\n",
"* Phoenix API key (https://app.phoenix.arize.com/)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import os\n",
"from getpass import getpass\n",
"\n",
"# Prompt the user for their API keys if they haven't been set\n",
"openai_key = os.getenv(\"OPENAI_API_KEY\", \"OPENAI_API_KEY\")\n",
"serper_key = os.getenv(\"SERPER_API_KEY\", \"SERPER_API_KEY\")\n",
"\n",
"if openai_key == \"OPENAI_API_KEY\":\n",
" openai_key = getpass(\"Please enter your OPENAI_API_KEY: \")\n",
"\n",
"if serper_key == \"SERPER_API_KEY\":\n",
" serper_key = getpass(\"Please enter your SERPER_API_KEY: \")\n",
"\n",
"# Set the environment variables with the provided keys\n",
"os.environ[\"OPENAI_API_KEY\"] = openai_key\n",
"os.environ[\"SERPER_API_KEY\"] = serper_key"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"if \"PHOENIX_API_KEY\" not in os.environ:\n",
" os.environ[\"PHOENIX_API_KEY\"] = getpass(\"🔑 Enter your Phoenix API key: \")\n",
"\n",
"if \"PHOENIX_COLLECTOR_ENDPOINT\" not in os.environ:\n",
" os.environ[\"PHOENIX_COLLECTOR_ENDPOINT\"] = getpass(\"🔑 Enter your Phoenix Collector Endpoint\")"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "r9X87mdGnpbc"
},
"source": [
"## Configure Tracing"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from phoenix.otel import register\n",
"\n",
"tracer_provider = register(project_name=\"crewai-agents\")"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "vYT-EU56ni94"
},
"source": [
"# Instrument CrewAI"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from openinference.instrumentation.crewai import CrewAIInstrumentor\n",
"\n",
"CrewAIInstrumentor().instrument(skip_dep_check=True, tracer_provider=tracer_provider)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Define your Working Agents"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import nest_asyncio\n",
"from crewai import Agent, Crew, Process, Task\n",
"from crewai.flow import Flow, listen, router, start\n",
"from pydantic import BaseModel\n",
"\n",
"research_analyst = Agent(\n",
" role=\"Senior Research Analyst\",\n",
" goal=\"Gather and summarize data on the requested topic.\",\n",
" backstory=\"Expert in tech market trends.\",\n",
" allow_delegation=False,\n",
")\n",
"\n",
"content_strategist = Agent(\n",
" role=\"Tech Content Strategist\",\n",
" goal=\"Craft an article outline based on provided research.\",\n",
" backstory=\"Storyteller who turns data into narratives.\",\n",
" allow_delegation=False,\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"From here, there are two ways to do this -- through a routing Agent or through ```@router()``` decorator in Flows (allows you to define conditional routing logic based on the output of a method)."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Option 1: Define your logic for Router Agent to classify the query & run corresponding Agent"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"routerAgent = Agent(\n",
" role=\"Router\",\n",
" goal=\"Classify each query as either 'research' or 'content outline'.\",\n",
" backstory=\"Triage bot for content workflows.\",\n",
" verbose=False,\n",
")\n",
"\n",
"\n",
"def route(user_input: str, router):\n",
" router_task = Task(\n",
" description=user_input, agent=router, expected_output=\"One word: 'research' or 'content'\"\n",
" )\n",
" router_classify = Crew(\n",
" agents=[router], tasks=[router_task], process=Process.sequential, verbose=False\n",
" )\n",
" router_results = router_classify.kickoff()\n",
" return router_results\n",
"\n",
"\n",
"def type_of_task(router_results):\n",
" if isinstance(router_results, list):\n",
" result = router_results[0]\n",
" result_text = result.text if hasattr(result, \"text\") else str(result)\n",
" else:\n",
" result_text = (\n",
" router_results.text if hasattr(router_results, \"text\") else str(router_results)\n",
" )\n",
" task_type = result_text.strip().lower()\n",
"\n",
" return task_type\n",
"\n",
"\n",
"def working_agent(task_type, user_input: str):\n",
" if \"research\" in task_type:\n",
" agent = research_analyst\n",
" label = \"Research Analyst\"\n",
" else:\n",
" agent = content_strategist\n",
" label = \"Content Strategist\"\n",
"\n",
" work_task = Task(description=user_input, agent=agent, expected_output=\"Agent response\")\n",
" worker_crew = Crew(agents=[agent], tasks=[work_task], process=Process.sequential, verbose=True)\n",
" work_results = worker_crew.kickoff()\n",
" if isinstance(work_results, list):\n",
" output = work_results[0].text if hasattr(work_results[0], \"text\") else str(work_results[0])\n",
" else:\n",
" output = work_results.text if hasattr(work_results, \"text\") else str(work_results)\n",
"\n",
" print(f\"\\n=== Routed to {label} ({task_type}) ===\\n{output}\\n\")\n",
" return output"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Examples Runs"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# ─── 4) Example Runs ─────────────────────────────────────────────────────────\n",
"for query in [\n",
" \"Please research the latest AI safety papers.\",\n",
" \"Outline an article on AI safety trends.\",\n",
"]:\n",
" router_output = route(query, routerAgent)\n",
" task_output = type_of_task(router_output)\n",
" working_agent(task_output, query)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Option 2: Define your logic for ```@router()``` Decorator to define routing logic"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"nest_asyncio.apply()\n",
"\n",
"\n",
"# Define Flow State\n",
"class RoutingState(BaseModel):\n",
" query: str = \"\"\n",
" route: str = \"\"\n",
"\n",
"\n",
"# Define Structured Flow\n",
"class RoutingFlow(Flow[RoutingState]):\n",
" def __init__(self, query: str):\n",
" super().__init__(state=RoutingState(query=query))\n",
"\n",
" @start()\n",
" def handle_query(self):\n",
" print(f\"📥 Incoming Query: {self.state.query}\")\n",
"\n",
" @router(handle_query)\n",
" def decide_route(self):\n",
" if \"research\" in self.state.query.lower():\n",
" self.state.route = \"research\"\n",
" return \"research\"\n",
" else:\n",
" self.state.route = \"outline\"\n",
" return \"outline\"\n",
"\n",
" @listen(\"research\")\n",
" def run_research(self):\n",
" task = Task(\n",
" description=self.state.query,\n",
" expected_output=\"Summary of findings on AI safety\",\n",
" agent=research_analyst,\n",
" )\n",
" crew = Crew(\n",
" agents=[research_analyst], tasks=[task], process=Process.sequential, verbose=True\n",
" )\n",
" crew.kickoff()\n",
"\n",
" @listen(\"outline\")\n",
" def run_content_strategy(self):\n",
" task = Task(\n",
" description=self.state.query,\n",
" expected_output=\"An article outline about the given topic\",\n",
" agent=content_strategist,\n",
" )\n",
" crew = Crew(\n",
" agents=[content_strategist], tasks=[task], process=Process.sequential, verbose=True\n",
" )\n",
" crew.kickoff()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Examples Runs"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"queries = [\n",
" \"Please research the latest AI safety papers.\",\n",
" \"Outline an article on AI safety trends.\",\n",
"]\n",
"\n",
"for query in queries:\n",
" flow = RoutingFlow(query=query)\n",
" flow.kickoff()"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "fH0uVMgxpLql"
},
"source": [
"### Check your Phoenix project to view the traces and spans from your runs."
]
}
],
"metadata": {
"language_info": {
"name": "python"
}
},
"nbformat": 4,
"nbformat_minor": 0
}