multilingual_text2cypher_evals.ipynb•25.8 kB
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"<center>\n",
" <p style=\"text-align:center\">\n",
" <img alt=\"phoenix logo\" src=\"https://storage.googleapis.com/arize-phoenix-assets/assets/phoenix-logo-light.svg\" width=\"200\"/>\n",
" <br>\n",
" <a href=\"https://arize.com/docs/phoenix/\">Docs</a>\n",
" |\n",
" <a href=\"https://github.com/Arize-ai/phoenix\">GitHub</a>\n",
" |\n",
" <a href=\"https://arize-ai.slack.com/join/shared_invite/zt-2w57bhem8-hq24MB6u7yE_ZF_ilOYSBw#/shared-invite/email\">Community</a>\n",
" </p>\n",
"</center>\n",
"\n",
"**Credits:** [Mohit Talniya](https://www.linkedin.com/in/mohit-talniya-5917b3242/), Phoenix Ambassador \n",
"\n",
"# Multilingual Text2Cypher for Indian Languages\n",
"\n",
"Our evaluation pipeline uses Phoenix to provide comprehensive observability across the entire multilingual evaluation process. Starting with English text-to-Cypher pairs from Neo4j's dataset, we translate questions to target languages and test whether models can generate correct Cypher from non-English inputs. Here's what makes this approach powerful:\n",
"\n",
"End-to-end tracking of English → translation → Cypher generation chains\n",
"Translation quality assessment using back-translation and semantic similarity\n",
"Cross-lingual Cypher accuracy comparison against English ground truth\n",
"Rich metadata capture for detailed failure analysis\n",
"Automated evaluation uploads for team collaboration\n",
"\n",
"Let's get started!\n",
"\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/"
},
"collapsed": true,
"id": "tT38CouFrmJL",
"outputId": "b61c11d3-df0f-4db0-ec21-5d93ca22da52"
},
"outputs": [],
"source": [
"!pip install dspy-ai pandas datasets sentence-transformers openinference-instrumentation-dspy openinference-instrumentation opentelemetry-api opentelemetry-sdk litellm openinference-instrumentation-litellm arize-phoenix-otel arize-phoenix-client"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "NpAlIL-h0IcH"
},
"source": [
"1. ***We begin by importing all necessary libraries.***"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "tqw7cSQPomrB"
},
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "p9uOov-NtnRw"
},
"outputs": [],
"source": [
"import os\n",
"from getpass import getpass\n",
"\n",
"import dspy\n",
"import nest_asyncio\n",
"from datasets import load_dataset\n",
"from openinference.instrumentation import suppress_tracing\n",
"from openinference.instrumentation.dspy import DSPyInstrumentor\n",
"from openinference.semconv.trace import SpanAttributes\n",
"from opentelemetry.trace import Status, StatusCode, format_span_id\n",
"from sentence_transformers import SentenceTransformer, util\n",
"\n",
"from phoenix.client import Client\n",
"from phoenix.otel import register\n",
"\n",
"nest_asyncio.apply()"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "BPtKPrEy0lZq"
},
"source": [
"# Global Configuration\n",
"Next, we define the main configuration parameters for our evaluation. This includes the size of the dataset to use and the list of languages we want to translate the questions into. We also define the language models that will be evaluated."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "jWlZEOSYt-Tt"
},
"outputs": [],
"source": [
"if not (phoenix_endpoint := os.getenv(\"PHOENIX_COLLECTOR_ENDPOINT\")):\n",
" phoenix_endpoint = getpass(\"🔑 Enter your Phoenix Collector Endpoint: \")\n",
"os.environ[\"PHOENIX_COLLECTOR_ENDPOINT\"] = phoenix_endpoint\n",
"\n",
"if not (phoenix_api_key := os.getenv(\"PHOENIX_API_KEY\")):\n",
" phoenix_api_key = getpass(\"🔑 Enter your Phoenix API key: \")\n",
"os.environ[\"PHOENIX_API_KEY\"] = phoenix_api_key\n",
"\n",
"\n",
"if not (openai_api_key := os.getenv(\"OPENAI_API_KEY\")):\n",
" openai_api_key = getpass(\"🔑 Enter your OpenAI API key: \")\n",
"os.environ[\"OPENAI_API_KEY\"] = openai_api_key\n",
"\n",
"PROJECT_NAME = \"neo4j-multilingual-eval-v5\"\n",
"\n",
"# Configuration for the evaluation run\n",
"CONFIG = {\"dataset_size\": 10, \"target_languages\": [\"Hindi\", \"Tamil\", \"Telugu\"]}\n",
"\n",
"# Define all models to be evaluated\n",
"# Note: Ensure the API base URL is correct for your local model server.\n",
"# For this notebook: We are using openAI, but actual impl was done with gpt-oss-20b\n",
"models_to_evaluate = [\n",
" # {\n",
" # \"name\": \"openai/gpt-oss-20b\",\n",
" # \"instance\": dspy.LM(model='openai/gpt-oss-20b', api_base=\"http://localhost:1234/v1\"),\n",
" # \"api_key_env\": \"OPENAI_API_KEY\"\n",
" # }\n",
" {\n",
" \"name\": \"openai/gpt-4o-mini\",\n",
" \"instance\": dspy.LM(model=\"openai/gpt-4o-mini\", api_key=os.environ[\"OPENAI_API_KEY\"]),\n",
" }\n",
"]"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "ku_debTG1F4I"
},
"source": [
"# Define Initialization and Validation Functions\n",
"These helper functions will initialize the Phoenix tracer for observability and validate that the necessary API keys are available in the environment."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "7Q_-WVY7uQEy"
},
"outputs": [],
"source": [
"def init_phoenix():\n",
" \"\"\"Initialize Phoenix UI for observability.\"\"\"\n",
" tracer_provider = register(\n",
" project_name=PROJECT_NAME, endpoint=os.environ[\"PHOENIX_COLLECTOR_ENDPOINT\"]\n",
" )\n",
"\n",
" tracer = tracer_provider.get_tracer(__name__)\n",
" DSPyInstrumentor().instrument(tracer_provider=tracer_provider)\n",
"\n",
" # LiteLLMInstrumentor().instrument(tracer_provider=tracer_provider)\n",
"\n",
" print(\"DSPy instrumentation enabled\")\n",
"\n",
" # Phoenix register expects \"/v1/traces\" in the endpoint for local instance but the tracer register does not\n",
" base_url = os.environ[\"PHOENIX_COLLECTOR_ENDPOINT\"].replace(\"/v1/traces\", \"\")\n",
" client = Client(base_url=base_url)\n",
" return tracer, client"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "DEmirmDz10hL"
},
"source": [
"# Define Resource Loading Function\n",
"This function handles loading the text2cypher dataset from Hugging Face and the sentence-transformer model, which will be used to calculate semantic similarity for translation quality."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "M6lzR2V9v-ia"
},
"outputs": [],
"source": [
"def load_resources():\n",
" \"\"\"Load dataset and sentence model.\"\"\"\n",
" print(\"Loading text2cypher dataset\")\n",
" ds = load_dataset(\"neo4j/text2cypher-2024v1\")\n",
" train_samples = ds[\"train\"].select(range(CONFIG[\"dataset_size\"]))\n",
" print(f\"Dataset loaded with {len(train_samples)} samples\")\n",
"\n",
" print(\"Loading sentence-transformer model for semantic scoring\")\n",
" # Using all-MiniLM-L6-v2, a fast and effective model\n",
" sentence_model = SentenceTransformer(\"all-MiniLM-L6-v2\")\n",
" print(\"Sentence model loaded successfully\")\n",
"\n",
" return train_samples, sentence_model"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "wEWtIFiR2OvM"
},
"source": [
"# Define DSPy Signatures and Pipeline Module\n",
"Here we define the core components of our LLM pipeline using DSPy's signature system.\n",
"\n",
"* Translate: Translates a source text to a target language.\n",
"\n",
"* GenerateCypher: Creates a Cypher query from a natural language question and a database schema.\n",
"\n",
"* EvaluateCypherEquivalence: An LLM-based judge to assess if two Cypher queries are functionally equivalent.\n",
"\n",
"* FullPipelineEvaluator: A dspy.Module that chains the translation and Cypher generation steps together."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "xGm47GAKwJas"
},
"outputs": [],
"source": [
"class Translate(dspy.Signature):\n",
" \"\"\"Translates text into a target language.\"\"\"\n",
"\n",
" source_text = dspy.InputField(desc=\"The question in source language\")\n",
" target_language = dspy.InputField(desc=\"Name of the language\")\n",
" translated_text = dspy.OutputField(desc=\"The question in the target language.\")\n",
"\n",
"\n",
"class GenerateCypher(dspy.Signature):\n",
" \"\"\"Generates a Cypher query from a question and schema.\"\"\"\n",
"\n",
" question = dspy.InputField(desc=\"Natural Language Question\")\n",
" neo4j_schema = dspy.InputField(desc=\"Neo4J schema\")\n",
" cypher_query = dspy.OutputField(desc=\"Generated Cypher Query\")\n",
"\n",
"\n",
"class EvaluateCypherEquivalence(dspy.Signature):\n",
" \"\"\"Determines if two Cypher queries are functionally equivalent using categorical assessment.\"\"\"\n",
"\n",
" ground_truth_query = dspy.InputField(desc=\"The correct/reference Cypher query\")\n",
" generated_query = dspy.InputField(desc=\"The generated Cypher query to evaluate\")\n",
" natural_lang_query = dspy.InputField(desc=\"Natural Language Query in english\")\n",
" neo4j_schema = dspy.InputField(desc=\"Neo4j database schema for context\")\n",
" equivalence_category = dspy.OutputField(\n",
" desc=\"One of: EQUIVALENT, PARTIALLY_CORRECT, INCORRECT, SYNTAX_ERROR\"\n",
" )\n",
" reasoning = dspy.OutputField(desc=\"Brief explanation of the categorization\")\n",
"\n",
"\n",
"class FullPipelineEvaluator(dspy.Module):\n",
" \"\"\"A DSPy module that runs the translation and Cypher generation steps.\"\"\"\n",
"\n",
" def __init__(self):\n",
" super().__init__()\n",
" self.translator = dspy.Predict(Translate)\n",
" self.cypher_generator = dspy.Predict(GenerateCypher)\n",
"\n",
" def forward(self, question, schema, target_language):\n",
" print(f\"Running pipeline for question: {question[:50]}...\")\n",
"\n",
" # 1. Get translation\n",
" t = self.translator(source_text=question, target_language=target_language)\n",
" print(f\"Translation completed: {t.translated_text[:50]}...\")\n",
"\n",
" # 2. Generate cypher from translated question\n",
" c = self.cypher_generator(question=t.translated_text, neo4j_schema=schema)\n",
" print(f\"Cypher generation completed: {c.cypher_query[:50]}...\")\n",
"\n",
" return dspy.Prediction(\n",
" translated_question=t.translated_text, generated_query=c.cypher_query\n",
" )"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "V7kyav1D2ucu"
},
"source": [
"# Define Evaluation Functions\n",
"These functions are responsible for scoring the outputs of the pipeline.\n",
"\n",
"* score_translation_quality: Uses back-translation and semantic similarity to score the quality of the translation.\n",
"\n",
"* compare_query_results_with_llm: Uses the EvaluateCypherEquivalence judge to score the correctness of the generated Cypher query.\n",
"\n",
"* safe_set_span_attributes: A utility to safely add attributes to OpenTelemetry spans for tracing."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "iDuYaqzawTFx"
},
"outputs": [],
"source": [
"def score_translation_quality(original_text, translated_text, back_translator, sentence_model):\n",
" \"\"\"Performs back-translation and returns a semantic similarity score.\"\"\"\n",
" print(\"Evaluating translation quality\")\n",
" with suppress_tracing():\n",
" try:\n",
" back_translation_result = back_translator(\n",
" source_text=translated_text, target_language=\"English\"\n",
" )\n",
" back_translated_text = back_translation_result.translated_text\n",
"\n",
" embedding_1 = sentence_model.encode(original_text, convert_to_tensor=True)\n",
" embedding_2 = sentence_model.encode(back_translated_text, convert_to_tensor=True)\n",
"\n",
" similarity_score = util.pytorch_cos_sim(embedding_1, embedding_2).item()\n",
" print(f\"Translation quality score: {similarity_score:.3f}\")\n",
"\n",
" return {\"score\": similarity_score, \"back_translated_text\": back_translated_text}\n",
" except Exception as e:\n",
" print(f\"Error in translation quality evaluation: {e}\")\n",
" return {\"score\": 0.0, \"back_translated_text\": \"\", \"error\": str(e)}\n",
"\n",
"\n",
"def compare_query_results_with_llm(\n",
" ground_truth_query, generated_query, schema, cypher_judge, natural_language_question\n",
"):\n",
" \"\"\"Compare queries using a categorical LLM judge.\"\"\"\n",
" print(\"Comparing queries using categorical LLM judge\")\n",
" try:\n",
" result = cypher_judge(\n",
" ground_truth_query=ground_truth_query,\n",
" generated_query=generated_query,\n",
" neo4j_schema=schema,\n",
" natural_lang_query=natural_language_question,\n",
" )\n",
"\n",
" category = result.equivalence_category.upper()\n",
"\n",
" if category == \"EQUIVALENT\":\n",
" correct, score = True, 1.0\n",
" elif category == \"PARTIALLY_CORRECT\":\n",
" correct, score = None, 0.5 # Partial credit\n",
" else: # INCORRECT, SYNTAX_ERROR, or other\n",
" correct, score = False, 0.0\n",
"\n",
" return {\n",
" \"correct\": correct,\n",
" \"category\": category,\n",
" \"score\": score,\n",
" \"reason\": result.reasoning,\n",
" }\n",
"\n",
" except Exception as e:\n",
" print(f\"Error in LLM judge evaluation: {e}\")\n",
" return {\n",
" \"correct\": False,\n",
" \"category\": \"ERROR\",\n",
" \"score\": 0.0,\n",
" \"reason\": f\"Judge evaluation error: {e}\",\n",
" }\n",
"\n",
"\n",
"def safe_set_span_attributes(span, attributes):\n",
" \"\"\"Safely set span attributes without causing warnings.\"\"\"\n",
" if span and span.is_recording():\n",
" try:\n",
" span.set_attributes(attributes)\n",
" except Exception as e:\n",
" print(f\"Could not set span attributes: {e}\")"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "YOx7Zrd73PIo"
},
"source": [
"# Define Core Sample Processing Function\n",
"This is the main orchestration function. For a single data sample, it runs the full pipeline, evaluates the results, and records everything in a custom OpenTelemetry span that will be visible in Phoenix."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "5WIo82vXweRb"
},
"outputs": [],
"source": [
"def process_and_evaluate_sample(\n",
" tracer,\n",
" sample,\n",
" evaluator,\n",
" back_translator,\n",
" sentence_model,\n",
" target_language,\n",
" cypher_judge,\n",
" sample_id,\n",
"):\n",
" \"\"\"Main function to process one sample, orchestrating all steps.\"\"\"\n",
" print(f\"Processing sample {sample_id} for language {target_language}\")\n",
"\n",
" with tracer.start_as_current_span(\n",
" f\"multilingual_evaluation_{target_language}\",\n",
" openinference_span_kind=\"chain\",\n",
" attributes={\"evaluation.sample_id\": str(sample_id)},\n",
" ) as eval_span:\n",
" try:\n",
" # Run the full translation and cypher generation pipeline\n",
" print(f\"Original Question (English): {sample['question']}\")\n",
" pipeline_result = evaluator(\n",
" question=sample[\"question\"],\n",
" schema=sample[\"schema\"],\n",
" target_language=target_language,\n",
" )\n",
" print(f\"Translated Question ({target_language}): {pipeline_result.translated_question}\")\n",
"\n",
" # Evaluate translation quality\n",
" translation_score_result = score_translation_quality(\n",
" sample[\"question\"],\n",
" pipeline_result.translated_question,\n",
" back_translator,\n",
" sentence_model,\n",
" )\n",
"\n",
" # Evaluate Cypher query correctness\n",
" cypher_assessment = compare_query_results_with_llm(\n",
" sample[\"cypher\"],\n",
" pipeline_result.generated_query,\n",
" sample[\"schema\"],\n",
" cypher_judge,\n",
" sample[\"question\"],\n",
" )\n",
"\n",
" # Set all evaluation results as attributes on the span\n",
" evaluation_attrs = {\n",
" \"evaluation.translation_score\": float(translation_score_result.get(\"score\", 0.0)),\n",
" \"evaluation.cypher_correct\": cypher_assessment.get(\"correct\"),\n",
" \"evaluation.cypher_category\": cypher_assessment.get(\"category\", \"\"),\n",
" \"evaluation.cypher_score\": cypher_assessment.get(\"score\", 0.0),\n",
" \"evaluation.cypher_reason\": cypher_assessment.get(\"reason\", \"\"),\n",
" \"evaluation.ground_truth_query\": sample[\"cypher\"],\n",
" \"evaluation.generated_query\": pipeline_result.generated_query,\n",
" \"evaluation.original_text\": sample[\"question\"],\n",
" \"evaluation.translated_text\": pipeline_result.translated_question,\n",
" SpanAttributes.INPUT_VALUE: pipeline_result.translated_question,\n",
" SpanAttributes.OUTPUT_VALUE: pipeline_result.generated_query,\n",
" }\n",
" safe_set_span_attributes(eval_span, evaluation_attrs)\n",
" eval_span.set_status(Status(StatusCode.OK))\n",
" span_id = format_span_id(eval_span.get_span_context().span_id)\n",
"\n",
" print(f\"Sample {sample_id} evaluation completed successfully\")\n",
" return pipeline_result, translation_score_result, cypher_assessment, span_id\n",
"\n",
" except Exception as e:\n",
" print(f\"Error processing sample {sample_id}: {e}\", exc_info=True)\n",
" safe_set_span_attributes(eval_span, {\"error\": str(e)})\n",
" eval_span.set_status(Status(StatusCode.ERROR, description=str(e)))\n",
" span_id = format_span_id(eval_span.get_span_context().span_id)\n",
" return (\n",
" None,\n",
" {\"score\": 0.0, \"error\": str(e)},\n",
" {\"correct\": False, \"reason\": f\"Pipeline error: {e}\"},\n",
" span_id,\n",
" )"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "cx2cCDPw3jKL"
},
"source": [
"# Initialize and Run the Full Evaluation\n",
"This is the main execution block. We initialize Phoenix, validate keys, and load our data. Then, we loop through each model, each target language, and each sample in our dataset, calling the processing function defined above and collecting the results."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/"
},
"id": "Jn1OLTc93wEO",
"outputId": "194d0e65-71b9-47b1-8d9f-97325abbd162"
},
"outputs": [],
"source": [
"# 1. Initialize Phoenix and validate setup\n",
"tracer, client = init_phoenix()\n",
"\n",
"# 2. Load dataset and sentence model\n",
"train_samples, sentence_model = load_resources()\n",
"\n",
"all_results = []\n",
"\n",
"# 3. Main evaluation loop\n",
"for model_config in models_to_evaluate:\n",
" model_name = model_config[\"name\"]\n",
" llm_instance = model_config[\"instance\"]\n",
"\n",
" print(f\"--- Evaluating Model: {model_name} ---\")\n",
" dspy.settings.configure(lm=llm_instance)\n",
"\n",
" # Initialize DSPy components for this model\n",
" evaluator = FullPipelineEvaluator()\n",
" back_translator = dspy.Predict(Translate)\n",
" cypher_judge = dspy.Predict(EvaluateCypherEquivalence)\n",
"\n",
" for target_language in CONFIG[\"target_languages\"]:\n",
" print(f\"--- Testing language: {target_language} ---\")\n",
"\n",
" for i, sample in enumerate(train_samples):\n",
" print(f\"Processing sample {i + 1}/{len(train_samples)} for {target_language}\")\n",
"\n",
" pipeline_result, translation_score, cypher_assessment, span_id = (\n",
" process_and_evaluate_sample(\n",
" tracer=tracer,\n",
" sample=sample,\n",
" evaluator=evaluator,\n",
" back_translator=back_translator,\n",
" sentence_model=sentence_model,\n",
" target_language=target_language,\n",
" cypher_judge=cypher_judge,\n",
" sample_id=i,\n",
" )\n",
" )\n",
"\n",
" # Add annotations to the span in Phoenix\n",
" try:\n",
" client.annotations.add_span_annotation(\n",
" annotation_name=\"translation_score\",\n",
" annotator_kind=\"LLM\",\n",
" span_id=span_id,\n",
" label=\"translation\",\n",
" score=float(translation_score.get(\"score\", 0.0)),\n",
" )\n",
" client.annotations.add_span_annotation(\n",
" annotation_name=\"cypher_score\",\n",
" annotator_kind=\"LLM\",\n",
" span_id=span_id,\n",
" label=\"cypher\",\n",
" score=cypher_assessment.get(\"score\", 0.0),\n",
" metadata={\"category\": cypher_assessment.get(\"category\", \"\")},\n",
" )\n",
" except Exception as e:\n",
" print(f\"Failed to push annotations for span {span_id}: {e}\")\n",
"\n",
" # Store results for final summary report\n",
" result = {\n",
" \"model_name\": model_name,\n",
" \"sample_id\": i,\n",
" \"question\": sample[\"question\"],\n",
" \"target_language\": target_language,\n",
" \"translation_score\": translation_score.get(\"score\", 0.0),\n",
" \"is_correct\": cypher_assessment.get(\"correct\"),\n",
" \"cypher_category\": cypher_assessment.get(\"category\"),\n",
" \"cypher_score\": cypher_assessment.get(\"score\", 0.0),\n",
" \"reason\": cypher_assessment.get(\"reason\"),\n",
" \"has_error\": \"error\" in translation_score or \"error\" in cypher_assessment,\n",
" }\n",
" if pipeline_result:\n",
" result.update(\n",
" {\n",
" \"translated_question\": pipeline_result.translated_question,\n",
" \"generated_query\": pipeline_result.generated_query,\n",
" }\n",
" )\n",
" all_results.append(result)\n",
"\n",
"print(\"--- Full evaluation run complete. ---\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# \n",
"\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# \n"
]
}
],
"metadata": {
"colab": {
"provenance": []
},
"kernelspec": {
"display_name": "Python 3",
"name": "python3"
},
"language_info": {
"name": "python"
}
},
"nbformat": 4,
"nbformat_minor": 0
}