modal_evaluate_answers.py•5.92 kB
import modal
import os
import asyncio
import datetime
import hashlib
import json
from cognee.shared.logging_utils import get_logger
from cognee.eval_framework.eval_config import EvalConfig
from cognee.eval_framework.evaluation.run_evaluation_module import run_evaluation
from cognee.eval_framework.metrics_dashboard import create_dashboard
logger = get_logger()
vol = modal.Volume.from_name("comparison-eval-answers", create_if_missing=True)
app = modal.App("comparison-eval-answerst")
image = (
modal.Image.from_dockerfile(path="Dockerfile_modal", force_build=False)
.copy_local_file("pyproject.toml", "pyproject.toml")
.copy_local_file("poetry.lock", "poetry.lock")
.env(
{
"ENV": os.getenv("ENV"),
"LLM_API_KEY": os.getenv("LLM_API_KEY"),
"OPENAI_API_KEY": os.getenv("OPENAI_API_KEY"),
}
)
.pip_install("protobuf", "h2", "deepeval", "gdown", "plotly")
)
@app.function(image=image, concurrency_limit=10, timeout=86400, volumes={"/data": vol})
async def modal_evaluate_answers(
answers_json_content: dict, answers_filename: str, eval_config: dict = None
):
"""Evaluates answers from JSON content and returns metrics results."""
if eval_config is None:
eval_config = EvalConfig().to_dict()
timestamp = datetime.datetime.utcnow().strftime("%Y%m%dT%H%M%SZ")
# Create temporary file path for the JSON content
base_name = os.path.splitext(answers_filename)[0]
temp_answers_path = f"/data/temp_answers_{base_name}_{timestamp}.json"
# Write JSON content to temporary file
with open(temp_answers_path, "w") as f:
json.dump(answers_json_content, f, ensure_ascii=False, indent=4)
# Set up output paths with simplified naming: prefix_original_file_name
eval_params = eval_config.copy()
eval_params["answers_path"] = temp_answers_path
eval_params["metrics_path"] = f"/data/metrics_{answers_filename}"
eval_params["aggregate_metrics_path"] = f"/data/aggregate_metrics_{answers_filename}"
eval_params["dashboard_path"] = f"/data/dashboard_{os.path.splitext(answers_filename)[0]}.html"
# eval_params["evaluation_engine"] = "DirectLLM"
# eval_params["evaluation_metrics"] = ["correctness"]
logger.info(f"Evaluating answers from: {answers_filename}")
logger.info(f"Using eval params: {eval_params}")
try:
# Only run evaluation (skip corpus building and question answering)
evaluated_answers = await run_evaluation(eval_params)
# Save evaluated answers
evaluated_answers_path = f"/data/evaluated_{answers_filename}"
with open(evaluated_answers_path, "w") as f:
json.dump(evaluated_answers, f, ensure_ascii=False, indent=4)
vol.commit()
# Generate dashboard if requested
if eval_params.get("dashboard"):
logger.info("Generating dashboard...")
html_output = create_dashboard(
metrics_path=eval_params["metrics_path"],
aggregate_metrics_path=eval_params["aggregate_metrics_path"],
output_file=eval_params["dashboard_path"],
benchmark=eval_params.get("benchmark", "Unknown"),
)
with open(eval_params["dashboard_path"], "w") as f:
f.write(html_output)
vol.commit()
logger.info(f"Evaluation completed for {answers_filename}")
# Return metrics results
result = {
"answers_file": answers_filename,
"metrics_path": eval_params["metrics_path"],
"aggregate_metrics_path": eval_params["aggregate_metrics_path"],
"dashboard_path": eval_params["dashboard_path"]
if eval_params.get("dashboard")
else None,
"evaluated_answers_path": evaluated_answers_path,
}
return result
except Exception as e:
logger.error(f"Error evaluating {answers_filename}: {e}")
raise
@app.local_entrypoint()
async def main():
"""Main entry point that evaluates multiple JSON answer files in parallel."""
json_files_dir = ""
json_files = [f for f in os.listdir(json_files_dir) if f.endswith(".json")]
json_file_paths = [os.path.join(json_files_dir, f) for f in json_files]
# Manually specify your evaluation configuration here
eval_config = EvalConfig(
# Only evaluation-related settings
evaluating_answers=True,
evaluating_contexts=False,
evaluation_engine="DeepEval",
evaluation_metrics=["correctness", "EM", "f1"],
calculate_metrics=True,
dashboard=True,
deepeval_model="gpt-5-mini",
).to_dict()
logger.info(f"Starting evaluation of {len(json_file_paths)} JSON files")
# Read JSON files locally and prepare tasks
modal_tasks = []
for json_path in json_file_paths:
try:
# Read JSON content locally
with open(json_path, "r", encoding="utf-8") as f:
json_content = json.load(f)
filename = os.path.basename(json_path)
# Create remote evaluation task with JSON content
task = modal_evaluate_answers.remote.aio(json_content, filename, eval_config)
modal_tasks.append(task)
except (FileNotFoundError, json.JSONDecodeError) as e:
logger.error(f"Error reading {json_path}: {e}")
continue
if not modal_tasks:
logger.error("No valid JSON files found to process")
return []
# Run evaluations in parallel
results = await asyncio.gather(*modal_tasks, return_exceptions=True)
# Log results
for i, result in enumerate(results):
if isinstance(result, Exception):
logger.error(f"Failed to evaluate {json_file_paths[i]}: {result}")
else:
logger.info(f"Successfully evaluated {result['answers_file']}")
return results