Skip to main content
Glama

@arizeai/phoenix-mcp

Official
by Arize-ai
tracing_and_evals_weaviate.ipynb17.9 kB
{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "<center>\n", " <p style=\"text-align:center\">\n", " <img alt=\"phoenix logo\" src=\"https://storage.googleapis.com/arize-phoenix-assets/assets/phoenix-logo-light.svg\" width=\"200\"/>\n", " <br>\n", " <a href=\"https://arize.com/docs/phoenix/\">Docs</a>\n", " |\n", " <a href=\"https://github.com/Arize-ai/phoenix\">GitHub</a>\n", " |\n", " <a href=\"https://arize-ai.slack.com/join/shared_invite/zt-11t1vbu4x-xkBIHmOREQnYnYDH1GDfCg?__hstc=259489365.a667dfafcfa0169c8aee4178d115dc81.1733501603539.1733501603539.1733501603539.1&__hssc=259489365.1.1733501603539&__hsfp=3822854628&submissionGuid=381a0676-8f38-437b-96f2-fc10875658df#/shared-invite/email\">Community</a>\n", " </p>\n", "</center>\n", "\n", "# <center>Tracing and Evaluating a Weaviate RAG Pipeline</center>\n", "\n", "This guide walks through how you can trace and evaluate a Weaviate RAG Pipeline. Phoenix will allow you to capture traces on all calls made to Weaviate, and evaluate runs of a RAG pipeline built around the vector database.\n", "\n", "*Note: This is intended to demonstrate how to break down and manually instrument all the pieces of a RAG pipeline. Weaviate does have easier ways to run RAG pipelines, however a more manual approach has been chosen here for demonstration purposes.*\n", "\n", "⚠️ You'll need an OpenAI key for this guide" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Dependencies and Keys" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!pip install -q arize-phoenix weaviate weaviate-client openai openinference-instrumentation-openai" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This guide uses an online instance of [Phoenix](https://phoenix.arize.com), however if you'd prefer to self-host Phoenix, you can follow [these instructions](https://arize.com/docs/phoenix/deployment)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import os\n", "from getpass import getpass\n", "\n", "os.environ[\"PHOENIX_API_KEY\"] = getpass(\"Enter your Phoenix API key: \")\n", "os.environ[\"PHOENIX_CLIENT_HEADERS\"] = f\"api_key={os.environ['PHOENIX_API_KEY']}\"\n", "os.environ[\"PHOENIX_COLLECTOR_ENDPOINT\"] = \"https://app.phoenix.arize.com\"\n", "\n", "os.environ[\"WEAVIATE_URL\"] = getpass(\"Enter your Weaviate API URL: \")\n", "os.environ[\"WEAVIATE_API_KEY\"] = getpass(\"Enter your Weaviate API key: \")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Connect to Weaviate\n", "\n", "Connect to your Weaviate Cloud instance. If you don't already have an instance, you can create one for free at https://auth.wcs.api.weaviate.io/auth/realms/SeMI/login-actions/registration" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import os\n", "\n", "import weaviate\n", "from weaviate.classes.init import Auth\n", "\n", "# Best practice: store your credentials in environment variables\n", "weaviate_url = os.environ[\"WEAVIATE_URL\"]\n", "weaviate_api_key = os.environ[\"WEAVIATE_API_KEY\"]\n", "\n", "client = weaviate.connect_to_weaviate_cloud(\n", " cluster_url=weaviate_url,\n", " auth_credentials=Auth.api_key(weaviate_api_key),\n", ")\n", "\n", "print(client.is_ready()) # Should print: `True`\n", "\n", "# client.close() # Free up resources" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Prepare your DB\n", "If you haven't already created a collection in Weaviate, the code below will create an example collection for you:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import os\n", "\n", "import weaviate\n", "from weaviate.classes.config import Configure\n", "from weaviate.classes.init import Auth\n", "\n", "# Best practice: store your credentials in environment variables\n", "wcd_url = os.environ[\"WEAVIATE_URL\"]\n", "wcd_api_key = os.environ[\"WEAVIATE_API_KEY\"]\n", "\n", "client = weaviate.connect_to_weaviate_cloud(\n", " cluster_url=wcd_url, # Replace with your Weaviate Cloud URL\n", " auth_credentials=Auth.api_key(wcd_api_key), # Replace with your Weaviate Cloud key\n", ")\n", "\n", "questions = client.collections.create(\n", " name=\"Question\",\n", " vectorizer_config=Configure.Vectorizer.text2vec_weaviate(), # Configure the Weaviate Embeddings integration\n", " generative_config=Configure.Generative.cohere(), # Configure the Cohere generative AI integration\n", ")\n", "\n", "client.close() # Free up resources" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import json\n", "import os\n", "\n", "import requests\n", "import weaviate\n", "from weaviate.classes.init import Auth\n", "\n", "# Best practice: store your credentials in environment variables\n", "wcd_url = os.environ[\"WEAVIATE_URL\"]\n", "wcd_api_key = os.environ[\"WEAVIATE_API_KEY\"]\n", "\n", "client = weaviate.connect_to_weaviate_cloud(\n", " cluster_url=wcd_url, # Replace with your Weaviate Cloud URL\n", " auth_credentials=Auth.api_key(wcd_api_key), # Replace with your Weaviate Cloud key\n", ")\n", "\n", "resp = requests.get(\n", " \"https://raw.githubusercontent.com/weaviate-tutorials/quickstart/main/data/jeopardy_tiny.json\"\n", ")\n", "data = json.loads(resp.text)\n", "\n", "questions = client.collections.get(\"Question\")\n", "\n", "with questions.batch.dynamic() as batch:\n", " for d in data:\n", " batch.add_object(\n", " {\n", " \"answer\": d[\"Answer\"],\n", " \"question\": d[\"Question\"],\n", " \"category\": d[\"Category\"],\n", " }\n", " )\n", " if batch.number_errors > 10:\n", " print(\"Batch import stopped due to excessive errors.\")\n", " break\n", "\n", "failed_objects = questions.batch.failed_objects\n", "if failed_objects:\n", " print(f\"Number of failed imports: {len(failed_objects)}\")\n", " print(f\"First failed object: {failed_objects[0]}\")\n", "\n", "client.close() # Free up resources" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Build and Instrument your RAG pipeline using Weaviate and OpenInference\n", "\n", "With Phoenix and Weaviate set up, you're now ready to build your pipeline." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from phoenix.otel import register\n", "\n", "phoenix_project_name = \"weaviate-rag-pipeline\"\n", "\n", "# Because you've install the openinference openai package, the call below will auto-instrument OpenAI calls\n", "tracer_provider = register(project_name=phoenix_project_name, auto_instrument=True)\n", "\n", "# Retrieve a tracer for manual instrumentation\n", "tracer = tracer_provider.get_tracer(__name__)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The following functions will build your RAG pipeline:\n", "1. Query Weaviate for relevant document chunks\n", "2. Format the retrieved data\n", "3. Create a generation prompt with the retrieved data\n", "4. Call your model with the generation prompt\n", "\n", "Each function will also be instrumented using OpenInference and Phoenix" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Query a Weaviate collection with tracing\n", "def query_weaviate(query_text, limit=3):\n", " # Start a span for the query\n", " with tracer.start_as_current_span(\n", " \"query_weaviate\", openinference_span_kind=\"retriever\"\n", " ) as span:\n", " # Set the input for the span\n", " span.set_input(query_text)\n", "\n", " # Query the collection\n", " collection_name = \"Question\"\n", " chunks = client.collections.get(collection_name)\n", " results = chunks.query.near_text(query=query_text, limit=limit)\n", "\n", " # Set the retrieved documents as attributes on the span\n", " for i, document in enumerate(results.objects):\n", " span.set_attribute(f\"retrieval.documents.{i}.document.id\", str(document.uuid))\n", " span.set_attribute(f\"retrieval.documents.{i}.document.metadata\", str(document.metadata))\n", " span.set_attribute(\n", " f\"retrieval.documents.{i}.document.content\", str(document.properties)\n", " )\n", "\n", " return results" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Process and format the retrieved results\n", "@tracer.chain # This will create a chain span for the function, same as the with tracer.start_as_current_span() in the query_weaviate function\n", "def format_context(results):\n", " context = \"\"\n", " for item in results.objects:\n", " properties = item.properties\n", " context += f\"Question: {properties['question']}\\n\"\n", " context += f\"Answer: {properties['answer']}\\n\"\n", " context += f\"Category: {properties['category']}\\n\\n\"\n", " return context" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Create a prompt with the retrieved information\n", "@tracer.chain\n", "def create_prompt(query_text, context):\n", " prompt = f\"\"\"\n", "Based on the following information, please answer the question: \"{query_text}\"\n", "\n", "Context:\n", "{context}\n", "\n", "Please provide a comprehensive answer based on the information provided.\n", "\"\"\"\n", " return prompt" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from openai import OpenAI\n", "\n", "# Initialize OpenAI client\n", "oa_client = OpenAI(api_key=os.environ.get(\"OPENAI_API_KEY\"))\n", "\n", "\n", "# Query OpenAI with the constructed prompt.\n", "# This function does not have tracing applied to it, because the OpenAI\n", "# client is instrumented using the auto_instrument flag in the register function.\n", "def query_openai(prompt):\n", " response = oa_client.chat.completions.create(\n", " model=\"gpt-4o-mini\",\n", " messages=[\n", " {\"role\": \"system\", \"content\": \"You are a helpful assistant.\"},\n", " {\"role\": \"user\", \"content\": prompt},\n", " ],\n", " )\n", " return response.choices[0].message.content" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "@tracer.chain\n", "def rag_pipeline(query):\n", " # Execute the query\n", " weaviate_results = query_weaviate(query)\n", " context = format_context(weaviate_results)\n", " print(\"Retrieved context:\")\n", " print(context)\n", "\n", " # Create a prompt with the retrieved information\n", " final_prompt = create_prompt(query, context)\n", "\n", " # Execute the OpenAI query\n", " final_answer = query_openai(final_prompt)\n", "\n", " return final_answer" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "query = \"What is the only living mammal in the order Proboseidea?\"\n", "\n", "final_answer = rag_pipeline(query)\n", "\n", "print(\"\\nFinal Answer:\")\n", "print(final_answer)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Evaluate your RAG System\n", "\n", "Now with your RAG system working, you can add evaluation metrics to both the retrieval and generation steps." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "os.environ[\"OPENAI_API_KEY\"] = getpass(\"Enter your OpenAI API Key\")\n", "\n", "import nest_asyncio\n", "\n", "nest_asyncio.apply()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from openinference.instrumentation.openai import OpenAIInstrumentor\n", "\n", "# Because you don't want to trace the OpenAI calls used for evaluation, you can uninstrument the OpenAI client\n", "OpenAIInstrumentor().uninstrument()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from phoenix.evals import OpenAIModel\n", "\n", "# Initialize the OpenAI model you'll use for evaluation\n", "eval_model = OpenAIModel(model=\"gpt-4o-mini\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import phoenix as px\n", "from phoenix.session.evaluation import get_retrieved_documents\n", "\n", "# Get the retrieved documents from Phoenix using this helper function\n", "retrieved_documents_df = get_retrieved_documents(px.Client(), project_name=phoenix_project_name)\n", "\n", "retrieved_documents_df" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from phoenix.evals import RelevanceEvaluator, run_evals\n", "\n", "# Initialize the built in Relevance evaluator\n", "relevance_evaluator = RelevanceEvaluator(eval_model)\n", "\n", "# Run the evaluation\n", "retrieved_documents_relevance_df = run_evals(\n", " evaluators=[relevance_evaluator],\n", " dataframe=retrieved_documents_df,\n", " provide_explanation=True,\n", " concurrency=20,\n", ")[0]" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "retrieved_documents_relevance_df.head()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from phoenix.session.evaluation import get_qa_with_reference\n", "\n", "# Get the Question and Answer with reference data from Phoenix using this helper function\n", "qa_with_reference_df = get_qa_with_reference(px.Client(), project_name=phoenix_project_name)\n", "qa_with_reference_df" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from phoenix.evals import (\n", " HallucinationEvaluator,\n", " QAEvaluator,\n", " run_evals,\n", ")\n", "\n", "# Initialize the built in Q&A evaluator\n", "qa_evaluator = QAEvaluator(eval_model)\n", "\n", "# Initialize the built in Hallucination evaluator\n", "hallucination_evaluator = HallucinationEvaluator(eval_model)\n", "\n", "# Run the evaluation\n", "qa_correctness_eval_df, hallucination_eval_df = run_evals(\n", " evaluators=[qa_evaluator, hallucination_evaluator],\n", " dataframe=qa_with_reference_df,\n", " provide_explanation=True,\n", " concurrency=20,\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from phoenix.client import AsyncClient\n", "from phoenix.trace import DocumentEvaluations\n", "\n", "px_client = AsyncClient()\n", "await px_client.spans.log_span_annotations_dataframe(\n", " dataframe=qa_correctness_eval_df,\n", " annotation_name=\"Q&A Correctness\",\n", " annotator_kind=\"LLM\",\n", ")\n", "await px_client.spans.log_span_annotations_dataframe(\n", " dataframe=hallucination_eval_df,\n", " annotation_name=\"Hallucination\",\n", " annotator_kind=\"LLM\",\n", ")\n", "\n", "px.Client().log_evaluations(\n", " DocumentEvaluations(dataframe=retrieved_documents_relevance_df, eval_name=\"relevance\"),\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "And just like that, you've now scored and evaluated your RAG pipeline!\n", "\n", "# ![Weaviate Trace in Phoenix UI](https://storage.googleapis.com/arize-phoenix-assets/assets/images/weaviate-manual-nb-trace.png)\n", "# ![Weaviate Traces in Phoenix UI](https://storage.googleapis.com/arize-phoenix-assets/assets/images/weaviate-manual-nb-traces.png)\n", "\n", "From here, you can continue to tweak your pipeline to improve your scores. Or if you're curious to learn more, check out some of our conceptual guides:\n", "* [LLM Evaluations Hub](https://arize.com/llm-evaluation)\n", "* [AI Agents Hub](https://arize.com/ai-agents/)" ] } ], "metadata": { "language_info": { "name": "python" } }, "nbformat": 4, "nbformat_minor": 2 }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Arize-ai/phoenix'

If you have feedback or need assistance with the MCP directory API, please join our Discord server