Skip to main content
Glama

@arizeai/phoenix-mcp

Official
by Arize-ai
crewai_routing_tutorial.ipynb11.1 kB
{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "<center>\n", " <p style=\"text-align:center\">\n", " <img alt=\"phoenix logo\" src=\"https://raw.githubusercontent.com/Arize-ai/phoenix-assets/9e6101d95936f4bd4d390efc9ce646dc6937fb2d/images/socal/github-large-banner-phoenix.jpg\" width=\"1000\"/>\n", " <br>\n", " <br>\n", " <a href=\"https://arize.com/docs/phoenix/\">Docs</a>\n", " |\n", " <a href=\"https://github.com/Arize-ai/phoenix\">GitHub</a>\n", " |\n", " <a href=\"https://arize-ai.slack.com/join/shared_invite/zt-2w57bhem8-hq24MB6u7yE_ZF_ilOYSBw#/shared-invite/email\">Community</a>\n", " </p>\n", "</center>\n", "<h1 align=\"center\">Tracing CrewAI with Arize Phoenix - Routing Workflow</h1>" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%pip install -q arize-phoenix opentelemetry-sdk opentelemetry-exporter-otlp crewai crewai_tools openinference-instrumentation-crewai" ] }, { "cell_type": "markdown", "metadata": { "id": "5-gPdVmIndw9" }, "source": [ "## Set up Keys and Dependencies" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Note: For this colab you'll need:\n", "\n", "* OpenAI API key (https://openai.com/)\n", "* Serper API key (https://serper.dev/)\n", "* Phoenix API key (https://app.phoenix.arize.com/)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import os\n", "from getpass import getpass\n", "\n", "# Prompt the user for their API keys if they haven't been set\n", "openai_key = os.getenv(\"OPENAI_API_KEY\", \"OPENAI_API_KEY\")\n", "serper_key = os.getenv(\"SERPER_API_KEY\", \"SERPER_API_KEY\")\n", "\n", "if openai_key == \"OPENAI_API_KEY\":\n", " openai_key = getpass(\"Please enter your OPENAI_API_KEY: \")\n", "\n", "if serper_key == \"SERPER_API_KEY\":\n", " serper_key = getpass(\"Please enter your SERPER_API_KEY: \")\n", "\n", "# Set the environment variables with the provided keys\n", "os.environ[\"OPENAI_API_KEY\"] = openai_key\n", "os.environ[\"SERPER_API_KEY\"] = serper_key" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "if \"PHOENIX_API_KEY\" not in os.environ:\n", " os.environ[\"PHOENIX_API_KEY\"] = getpass(\"🔑 Enter your Phoenix API key: \")\n", "\n", "if \"PHOENIX_COLLECTOR_ENDPOINT\" not in os.environ:\n", " os.environ[\"PHOENIX_COLLECTOR_ENDPOINT\"] = getpass(\"🔑 Enter your Phoenix Collector Endpoint\")" ] }, { "cell_type": "markdown", "metadata": { "id": "r9X87mdGnpbc" }, "source": [ "## Configure Tracing" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from phoenix.otel import register\n", "\n", "tracer_provider = register(project_name=\"crewai-agents\")" ] }, { "cell_type": "markdown", "metadata": { "id": "vYT-EU56ni94" }, "source": [ "# Instrument CrewAI" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from openinference.instrumentation.crewai import CrewAIInstrumentor\n", "\n", "CrewAIInstrumentor().instrument(skip_dep_check=True, tracer_provider=tracer_provider)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Define your Working Agents" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import nest_asyncio\n", "from crewai import Agent, Crew, Process, Task\n", "from crewai.flow import Flow, listen, router, start\n", "from pydantic import BaseModel\n", "\n", "research_analyst = Agent(\n", " role=\"Senior Research Analyst\",\n", " goal=\"Gather and summarize data on the requested topic.\",\n", " backstory=\"Expert in tech market trends.\",\n", " allow_delegation=False,\n", ")\n", "\n", "content_strategist = Agent(\n", " role=\"Tech Content Strategist\",\n", " goal=\"Craft an article outline based on provided research.\",\n", " backstory=\"Storyteller who turns data into narratives.\",\n", " allow_delegation=False,\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "From here, there are two ways to do this -- through a routing Agent or through ```@router()``` decorator in Flows (allows you to define conditional routing logic based on the output of a method)." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Option 1: Define your logic for Router Agent to classify the query & run corresponding Agent" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "routerAgent = Agent(\n", " role=\"Router\",\n", " goal=\"Classify each query as either 'research' or 'content outline'.\",\n", " backstory=\"Triage bot for content workflows.\",\n", " verbose=False,\n", ")\n", "\n", "\n", "def route(user_input: str, router):\n", " router_task = Task(\n", " description=user_input, agent=router, expected_output=\"One word: 'research' or 'content'\"\n", " )\n", " router_classify = Crew(\n", " agents=[router], tasks=[router_task], process=Process.sequential, verbose=False\n", " )\n", " router_results = router_classify.kickoff()\n", " return router_results\n", "\n", "\n", "def type_of_task(router_results):\n", " if isinstance(router_results, list):\n", " result = router_results[0]\n", " result_text = result.text if hasattr(result, \"text\") else str(result)\n", " else:\n", " result_text = (\n", " router_results.text if hasattr(router_results, \"text\") else str(router_results)\n", " )\n", " task_type = result_text.strip().lower()\n", "\n", " return task_type\n", "\n", "\n", "def working_agent(task_type, user_input: str):\n", " if \"research\" in task_type:\n", " agent = research_analyst\n", " label = \"Research Analyst\"\n", " else:\n", " agent = content_strategist\n", " label = \"Content Strategist\"\n", "\n", " work_task = Task(description=user_input, agent=agent, expected_output=\"Agent response\")\n", " worker_crew = Crew(agents=[agent], tasks=[work_task], process=Process.sequential, verbose=True)\n", " work_results = worker_crew.kickoff()\n", " if isinstance(work_results, list):\n", " output = work_results[0].text if hasattr(work_results[0], \"text\") else str(work_results[0])\n", " else:\n", " output = work_results.text if hasattr(work_results, \"text\") else str(work_results)\n", "\n", " print(f\"\\n=== Routed to {label} ({task_type}) ===\\n{output}\\n\")\n", " return output" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Examples Runs" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# ─── 4) Example Runs ─────────────────────────────────────────────────────────\n", "for query in [\n", " \"Please research the latest AI safety papers.\",\n", " \"Outline an article on AI safety trends.\",\n", "]:\n", " router_output = route(query, routerAgent)\n", " task_output = type_of_task(router_output)\n", " working_agent(task_output, query)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Option 2: Define your logic for ```@router()``` Decorator to define routing logic" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "nest_asyncio.apply()\n", "\n", "\n", "# Define Flow State\n", "class RoutingState(BaseModel):\n", " query: str = \"\"\n", " route: str = \"\"\n", "\n", "\n", "# Define Structured Flow\n", "class RoutingFlow(Flow[RoutingState]):\n", " def __init__(self, query: str):\n", " super().__init__(state=RoutingState(query=query))\n", "\n", " @start()\n", " def handle_query(self):\n", " print(f\"📥 Incoming Query: {self.state.query}\")\n", "\n", " @router(handle_query)\n", " def decide_route(self):\n", " if \"research\" in self.state.query.lower():\n", " self.state.route = \"research\"\n", " return \"research\"\n", " else:\n", " self.state.route = \"outline\"\n", " return \"outline\"\n", "\n", " @listen(\"research\")\n", " def run_research(self):\n", " task = Task(\n", " description=self.state.query,\n", " expected_output=\"Summary of findings on AI safety\",\n", " agent=research_analyst,\n", " )\n", " crew = Crew(\n", " agents=[research_analyst], tasks=[task], process=Process.sequential, verbose=True\n", " )\n", " crew.kickoff()\n", "\n", " @listen(\"outline\")\n", " def run_content_strategy(self):\n", " task = Task(\n", " description=self.state.query,\n", " expected_output=\"An article outline about the given topic\",\n", " agent=content_strategist,\n", " )\n", " crew = Crew(\n", " agents=[content_strategist], tasks=[task], process=Process.sequential, verbose=True\n", " )\n", " crew.kickoff()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Examples Runs" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "queries = [\n", " \"Please research the latest AI safety papers.\",\n", " \"Outline an article on AI safety trends.\",\n", "]\n", "\n", "for query in queries:\n", " flow = RoutingFlow(query=query)\n", " flow.kickoff()" ] }, { "cell_type": "markdown", "metadata": { "id": "fH0uVMgxpLql" }, "source": [ "### Check your Phoenix project to view the traces and spans from your runs." ] } ], "metadata": { "language_info": { "name": "python" } }, "nbformat": 4, "nbformat_minor": 0 }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Arize-ai/phoenix'

If you have feedback or need assistance with the MCP directory API, please join our Discord server