tracing_and_evals_weaviate.ipynb•17.9 kB
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"<center>\n",
" <p style=\"text-align:center\">\n",
" <img alt=\"phoenix logo\" src=\"https://storage.googleapis.com/arize-phoenix-assets/assets/phoenix-logo-light.svg\" width=\"200\"/>\n",
" <br>\n",
" <a href=\"https://arize.com/docs/phoenix/\">Docs</a>\n",
" |\n",
" <a href=\"https://github.com/Arize-ai/phoenix\">GitHub</a>\n",
" |\n",
" <a href=\"https://arize-ai.slack.com/join/shared_invite/zt-11t1vbu4x-xkBIHmOREQnYnYDH1GDfCg?__hstc=259489365.a667dfafcfa0169c8aee4178d115dc81.1733501603539.1733501603539.1733501603539.1&__hssc=259489365.1.1733501603539&__hsfp=3822854628&submissionGuid=381a0676-8f38-437b-96f2-fc10875658df#/shared-invite/email\">Community</a>\n",
" </p>\n",
"</center>\n",
"\n",
"# <center>Tracing and Evaluating a Weaviate RAG Pipeline</center>\n",
"\n",
"This guide walks through how you can trace and evaluate a Weaviate RAG Pipeline. Phoenix will allow you to capture traces on all calls made to Weaviate, and evaluate runs of a RAG pipeline built around the vector database.\n",
"\n",
"*Note: This is intended to demonstrate how to break down and manually instrument all the pieces of a RAG pipeline. Weaviate does have easier ways to run RAG pipelines, however a more manual approach has been chosen here for demonstration purposes.*\n",
"\n",
"⚠️ You'll need an OpenAI key for this guide"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Dependencies and Keys"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"!pip install -q arize-phoenix weaviate weaviate-client openai openinference-instrumentation-openai"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"This guide uses an online instance of [Phoenix](https://phoenix.arize.com), however if you'd prefer to self-host Phoenix, you can follow [these instructions](https://arize.com/docs/phoenix/deployment)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import os\n",
"from getpass import getpass\n",
"\n",
"os.environ[\"PHOENIX_API_KEY\"] = getpass(\"Enter your Phoenix API key: \")\n",
"os.environ[\"PHOENIX_CLIENT_HEADERS\"] = f\"api_key={os.environ['PHOENIX_API_KEY']}\"\n",
"os.environ[\"PHOENIX_COLLECTOR_ENDPOINT\"] = \"https://app.phoenix.arize.com\"\n",
"\n",
"os.environ[\"WEAVIATE_URL\"] = getpass(\"Enter your Weaviate API URL: \")\n",
"os.environ[\"WEAVIATE_API_KEY\"] = getpass(\"Enter your Weaviate API key: \")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Connect to Weaviate\n",
"\n",
"Connect to your Weaviate Cloud instance. If you don't already have an instance, you can create one for free at https://auth.wcs.api.weaviate.io/auth/realms/SeMI/login-actions/registration"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import os\n",
"\n",
"import weaviate\n",
"from weaviate.classes.init import Auth\n",
"\n",
"# Best practice: store your credentials in environment variables\n",
"weaviate_url = os.environ[\"WEAVIATE_URL\"]\n",
"weaviate_api_key = os.environ[\"WEAVIATE_API_KEY\"]\n",
"\n",
"client = weaviate.connect_to_weaviate_cloud(\n",
" cluster_url=weaviate_url,\n",
" auth_credentials=Auth.api_key(weaviate_api_key),\n",
")\n",
"\n",
"print(client.is_ready()) # Should print: `True`\n",
"\n",
"# client.close() # Free up resources"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Prepare your DB\n",
"If you haven't already created a collection in Weaviate, the code below will create an example collection for you:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import os\n",
"\n",
"import weaviate\n",
"from weaviate.classes.config import Configure\n",
"from weaviate.classes.init import Auth\n",
"\n",
"# Best practice: store your credentials in environment variables\n",
"wcd_url = os.environ[\"WEAVIATE_URL\"]\n",
"wcd_api_key = os.environ[\"WEAVIATE_API_KEY\"]\n",
"\n",
"client = weaviate.connect_to_weaviate_cloud(\n",
" cluster_url=wcd_url, # Replace with your Weaviate Cloud URL\n",
" auth_credentials=Auth.api_key(wcd_api_key), # Replace with your Weaviate Cloud key\n",
")\n",
"\n",
"questions = client.collections.create(\n",
" name=\"Question\",\n",
" vectorizer_config=Configure.Vectorizer.text2vec_weaviate(), # Configure the Weaviate Embeddings integration\n",
" generative_config=Configure.Generative.cohere(), # Configure the Cohere generative AI integration\n",
")\n",
"\n",
"client.close() # Free up resources"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import json\n",
"import os\n",
"\n",
"import requests\n",
"import weaviate\n",
"from weaviate.classes.init import Auth\n",
"\n",
"# Best practice: store your credentials in environment variables\n",
"wcd_url = os.environ[\"WEAVIATE_URL\"]\n",
"wcd_api_key = os.environ[\"WEAVIATE_API_KEY\"]\n",
"\n",
"client = weaviate.connect_to_weaviate_cloud(\n",
" cluster_url=wcd_url, # Replace with your Weaviate Cloud URL\n",
" auth_credentials=Auth.api_key(wcd_api_key), # Replace with your Weaviate Cloud key\n",
")\n",
"\n",
"resp = requests.get(\n",
" \"https://raw.githubusercontent.com/weaviate-tutorials/quickstart/main/data/jeopardy_tiny.json\"\n",
")\n",
"data = json.loads(resp.text)\n",
"\n",
"questions = client.collections.get(\"Question\")\n",
"\n",
"with questions.batch.dynamic() as batch:\n",
" for d in data:\n",
" batch.add_object(\n",
" {\n",
" \"answer\": d[\"Answer\"],\n",
" \"question\": d[\"Question\"],\n",
" \"category\": d[\"Category\"],\n",
" }\n",
" )\n",
" if batch.number_errors > 10:\n",
" print(\"Batch import stopped due to excessive errors.\")\n",
" break\n",
"\n",
"failed_objects = questions.batch.failed_objects\n",
"if failed_objects:\n",
" print(f\"Number of failed imports: {len(failed_objects)}\")\n",
" print(f\"First failed object: {failed_objects[0]}\")\n",
"\n",
"client.close() # Free up resources"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Build and Instrument your RAG pipeline using Weaviate and OpenInference\n",
"\n",
"With Phoenix and Weaviate set up, you're now ready to build your pipeline."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from phoenix.otel import register\n",
"\n",
"phoenix_project_name = \"weaviate-rag-pipeline\"\n",
"\n",
"# Because you've install the openinference openai package, the call below will auto-instrument OpenAI calls\n",
"tracer_provider = register(project_name=phoenix_project_name, auto_instrument=True)\n",
"\n",
"# Retrieve a tracer for manual instrumentation\n",
"tracer = tracer_provider.get_tracer(__name__)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The following functions will build your RAG pipeline:\n",
"1. Query Weaviate for relevant document chunks\n",
"2. Format the retrieved data\n",
"3. Create a generation prompt with the retrieved data\n",
"4. Call your model with the generation prompt\n",
"\n",
"Each function will also be instrumented using OpenInference and Phoenix"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Query a Weaviate collection with tracing\n",
"def query_weaviate(query_text, limit=3):\n",
" # Start a span for the query\n",
" with tracer.start_as_current_span(\n",
" \"query_weaviate\", openinference_span_kind=\"retriever\"\n",
" ) as span:\n",
" # Set the input for the span\n",
" span.set_input(query_text)\n",
"\n",
" # Query the collection\n",
" collection_name = \"Question\"\n",
" chunks = client.collections.get(collection_name)\n",
" results = chunks.query.near_text(query=query_text, limit=limit)\n",
"\n",
" # Set the retrieved documents as attributes on the span\n",
" for i, document in enumerate(results.objects):\n",
" span.set_attribute(f\"retrieval.documents.{i}.document.id\", str(document.uuid))\n",
" span.set_attribute(f\"retrieval.documents.{i}.document.metadata\", str(document.metadata))\n",
" span.set_attribute(\n",
" f\"retrieval.documents.{i}.document.content\", str(document.properties)\n",
" )\n",
"\n",
" return results"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Process and format the retrieved results\n",
"@tracer.chain # This will create a chain span for the function, same as the with tracer.start_as_current_span() in the query_weaviate function\n",
"def format_context(results):\n",
" context = \"\"\n",
" for item in results.objects:\n",
" properties = item.properties\n",
" context += f\"Question: {properties['question']}\\n\"\n",
" context += f\"Answer: {properties['answer']}\\n\"\n",
" context += f\"Category: {properties['category']}\\n\\n\"\n",
" return context"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Create a prompt with the retrieved information\n",
"@tracer.chain\n",
"def create_prompt(query_text, context):\n",
" prompt = f\"\"\"\n",
"Based on the following information, please answer the question: \"{query_text}\"\n",
"\n",
"Context:\n",
"{context}\n",
"\n",
"Please provide a comprehensive answer based on the information provided.\n",
"\"\"\"\n",
" return prompt"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from openai import OpenAI\n",
"\n",
"# Initialize OpenAI client\n",
"oa_client = OpenAI(api_key=os.environ.get(\"OPENAI_API_KEY\"))\n",
"\n",
"\n",
"# Query OpenAI with the constructed prompt.\n",
"# This function does not have tracing applied to it, because the OpenAI\n",
"# client is instrumented using the auto_instrument flag in the register function.\n",
"def query_openai(prompt):\n",
" response = oa_client.chat.completions.create(\n",
" model=\"gpt-4o-mini\",\n",
" messages=[\n",
" {\"role\": \"system\", \"content\": \"You are a helpful assistant.\"},\n",
" {\"role\": \"user\", \"content\": prompt},\n",
" ],\n",
" )\n",
" return response.choices[0].message.content"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"@tracer.chain\n",
"def rag_pipeline(query):\n",
" # Execute the query\n",
" weaviate_results = query_weaviate(query)\n",
" context = format_context(weaviate_results)\n",
" print(\"Retrieved context:\")\n",
" print(context)\n",
"\n",
" # Create a prompt with the retrieved information\n",
" final_prompt = create_prompt(query, context)\n",
"\n",
" # Execute the OpenAI query\n",
" final_answer = query_openai(final_prompt)\n",
"\n",
" return final_answer"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"query = \"What is the only living mammal in the order Proboseidea?\"\n",
"\n",
"final_answer = rag_pipeline(query)\n",
"\n",
"print(\"\\nFinal Answer:\")\n",
"print(final_answer)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Evaluate your RAG System\n",
"\n",
"Now with your RAG system working, you can add evaluation metrics to both the retrieval and generation steps."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"os.environ[\"OPENAI_API_KEY\"] = getpass(\"Enter your OpenAI API Key\")\n",
"\n",
"import nest_asyncio\n",
"\n",
"nest_asyncio.apply()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from openinference.instrumentation.openai import OpenAIInstrumentor\n",
"\n",
"# Because you don't want to trace the OpenAI calls used for evaluation, you can uninstrument the OpenAI client\n",
"OpenAIInstrumentor().uninstrument()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from phoenix.evals import OpenAIModel\n",
"\n",
"# Initialize the OpenAI model you'll use for evaluation\n",
"eval_model = OpenAIModel(model=\"gpt-4o-mini\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import phoenix as px\n",
"from phoenix.session.evaluation import get_retrieved_documents\n",
"\n",
"# Get the retrieved documents from Phoenix using this helper function\n",
"retrieved_documents_df = get_retrieved_documents(px.Client(), project_name=phoenix_project_name)\n",
"\n",
"retrieved_documents_df"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from phoenix.evals import RelevanceEvaluator, run_evals\n",
"\n",
"# Initialize the built in Relevance evaluator\n",
"relevance_evaluator = RelevanceEvaluator(eval_model)\n",
"\n",
"# Run the evaluation\n",
"retrieved_documents_relevance_df = run_evals(\n",
" evaluators=[relevance_evaluator],\n",
" dataframe=retrieved_documents_df,\n",
" provide_explanation=True,\n",
" concurrency=20,\n",
")[0]"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"retrieved_documents_relevance_df.head()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from phoenix.session.evaluation import get_qa_with_reference\n",
"\n",
"# Get the Question and Answer with reference data from Phoenix using this helper function\n",
"qa_with_reference_df = get_qa_with_reference(px.Client(), project_name=phoenix_project_name)\n",
"qa_with_reference_df"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from phoenix.evals import (\n",
" HallucinationEvaluator,\n",
" QAEvaluator,\n",
" run_evals,\n",
")\n",
"\n",
"# Initialize the built in Q&A evaluator\n",
"qa_evaluator = QAEvaluator(eval_model)\n",
"\n",
"# Initialize the built in Hallucination evaluator\n",
"hallucination_evaluator = HallucinationEvaluator(eval_model)\n",
"\n",
"# Run the evaluation\n",
"qa_correctness_eval_df, hallucination_eval_df = run_evals(\n",
" evaluators=[qa_evaluator, hallucination_evaluator],\n",
" dataframe=qa_with_reference_df,\n",
" provide_explanation=True,\n",
" concurrency=20,\n",
")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from phoenix.client import AsyncClient\n",
"from phoenix.trace import DocumentEvaluations\n",
"\n",
"px_client = AsyncClient()\n",
"await px_client.spans.log_span_annotations_dataframe(\n",
" dataframe=qa_correctness_eval_df,\n",
" annotation_name=\"Q&A Correctness\",\n",
" annotator_kind=\"LLM\",\n",
")\n",
"await px_client.spans.log_span_annotations_dataframe(\n",
" dataframe=hallucination_eval_df,\n",
" annotation_name=\"Hallucination\",\n",
" annotator_kind=\"LLM\",\n",
")\n",
"\n",
"px.Client().log_evaluations(\n",
" DocumentEvaluations(dataframe=retrieved_documents_relevance_df, eval_name=\"relevance\"),\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"And just like that, you've now scored and evaluated your RAG pipeline!\n",
"\n",
"# \n",
"# \n",
"\n",
"From here, you can continue to tweak your pipeline to improve your scores. Or if you're curious to learn more, check out some of our conceptual guides:\n",
"* [LLM Evaluations Hub](https://arize.com/llm-evaluation)\n",
"* [AI Agents Hub](https://arize.com/ai-agents/)"
]
}
],
"metadata": {
"language_info": {
"name": "python"
}
},
"nbformat": 4,
"nbformat_minor": 2
}