run_cross_benchmark_analysis.py•8.61 kB
#!/usr/bin/env python3
"""
Cross-benchmark analysis orchestration script.
Downloads qa-benchmarks volume and processes each benchmark folder.
"""
import os
import subprocess
import sys
from pathlib import Path
import pandas as pd
from analysis.analyze_single_benchmark import analyze_single_benchmark_folder
def download_modal_volume(volume_name: str, download_path: str) -> None:
"""Download entire modal volume to local directory."""
print(f"📥 Downloading modal volume: {volume_name}")
# Create download directory if it doesn't exist
Path(download_path).mkdir(parents=True, exist_ok=True)
original_dir = os.getcwd()
os.chdir(download_path)
try:
cmd = ["modal", "volume", "get", volume_name, "/"]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
print("✅ Successfully downloaded modal volume")
else:
print(f"❌ Error downloading volume: {result.stderr}")
sys.exit(1)
finally:
os.chdir(original_dir)
def get_benchmark_folders(volume_path: str) -> list:
"""Get list of benchmark folders from downloaded volume."""
volume_dir = Path(volume_path)
if not volume_dir.exists():
print(f"❌ Volume directory does not exist: {volume_path}")
return []
benchmark_folders = []
for item in volume_dir.iterdir():
if item.is_dir() and not item.name.startswith("."):
benchmark_folders.append(item.name)
print(f"📁 Found {len(benchmark_folders)} benchmark folders")
return sorted(benchmark_folders)
def check_evaluated_folder_exists(benchmark_path: str) -> bool:
"""Check if evaluated folder exists and contains JSON files."""
evaluated_path = Path(benchmark_path) / "evaluated"
if not evaluated_path.exists():
print(f"⚠️ No evaluated folder found: {evaluated_path}")
return False
json_files = list(evaluated_path.glob("*.json"))
if not json_files:
print(f"⚠️ No JSON files found in evaluated folder: {evaluated_path}")
return False
print(f"✅ Found {len(json_files)} JSON files in evaluated folder")
return True
def check_analysis_files_exist(benchmark_path: str) -> bool:
"""Check if analysis files already exist for this benchmark."""
analysis_path = Path(benchmark_path) / "analysis"
if not analysis_path.exists():
return False
# Check for any CSV files in analysis folder
csv_files = list(analysis_path.glob("*.csv"))
return len(csv_files) > 0
def create_analysis_folder(benchmark_path: str) -> str:
"""Create analysis folder for benchmark if it doesn't exist."""
analysis_path = Path(benchmark_path) / "analysis"
analysis_path.mkdir(parents=True, exist_ok=True)
return str(analysis_path)
def process_single_benchmark(benchmark_folder: str, volume_path: str) -> bool:
"""Process a single benchmark folder."""
benchmark_path = Path(volume_path) / benchmark_folder
print(f"\n🔄 Processing benchmark: {benchmark_folder}")
# Check if evaluated folder exists
if not check_evaluated_folder_exists(benchmark_path):
return False
# Check if analysis already exists
if check_analysis_files_exist(benchmark_path):
print(f"⏭️ Analysis files already exist, skipping: {benchmark_folder}")
return False
try:
# Run analysis for this benchmark
analyze_single_benchmark_folder(str(benchmark_path))
print(f"✅ Successfully processed: {benchmark_folder}")
return True
except Exception as e:
print(f"❌ Error processing {benchmark_folder}: {e}")
return False
def print_progress_update(current: int, total: int) -> None:
"""Print progress update every 1/5 of total."""
if current % max(1, total // 5) == 0:
print(f"📊 Progress: {current}/{total} benchmarks processed")
def process_all_benchmarks(volume_path: str) -> dict:
"""Process all benchmark folders with progress tracking."""
benchmark_folders = get_benchmark_folders(volume_path)
results = {"processed": [], "skipped": [], "failed": []}
if not benchmark_folders:
print("❌ No benchmark folders found")
return results
print(f"\n🚀 Starting analysis of {len(benchmark_folders)} benchmarks")
for i, folder in enumerate(benchmark_folders):
print_progress_update(i, len(benchmark_folders))
success = process_single_benchmark(folder, volume_path)
if success:
results["processed"].append(folder)
else:
results["skipped"].append(folder)
return results
def create_cross_benchmark_summary(volume_path: str, results: dict) -> None:
"""Create a summary CSV with average metrics from all processed benchmarks."""
print("\n📊 Creating cross-benchmark summary...")
summary_data = []
metrics = ["directllm_correctness", "deepeval_correctness", "EM", "f1"]
for benchmark_folder in results["processed"]:
benchmark_path = Path(volume_path) / benchmark_folder
aggregate_csv_path = benchmark_path / "analysis" / "metrics_aggregate.csv"
if aggregate_csv_path.exists():
try:
# Read the aggregate metrics CSV
df = pd.read_csv(aggregate_csv_path, index_col=0)
# Calculate average of averages for each metric
benchmark_summary = {"benchmark": benchmark_folder, "questions_count": len(df)}
for metric in metrics:
mean_col = f"{metric}_mean"
if mean_col in df.columns:
benchmark_summary[f"{metric}_avg"] = df[mean_col].mean()
else:
benchmark_summary[f"{metric}_avg"] = None
summary_data.append(benchmark_summary)
print(f" ✅ Added {benchmark_folder}: {len(df)} questions")
except Exception as e:
print(f" ❌ Error reading {benchmark_folder}: {e}")
else:
print(f" ⚠️ No aggregate file found for {benchmark_folder}")
if summary_data:
# Create summary DataFrame
summary_df = pd.DataFrame(summary_data)
# Sort by overall performance (average of all metrics)
metric_cols = [f"{metric}_avg" for metric in metrics]
valid_metrics = [col for col in metric_cols if col in summary_df.columns]
if valid_metrics:
summary_df["overall_avg"] = summary_df[valid_metrics].mean(axis=1)
summary_df = summary_df.sort_values("overall_avg", ascending=False)
# Save summary CSV
summary_path = Path(volume_path) / "cross_benchmark_summary.csv"
summary_df.to_csv(summary_path, index=False)
print(f"📈 Cross-benchmark summary saved to: {summary_path}")
print(f"📊 Processed {len(summary_df)} benchmarks")
# Print top performers
print("\n🏆 Top 3 performers:")
for i, row in summary_df.head(3).iterrows():
print(f" {i + 1}. {row['benchmark']}: {row.get('overall_avg', 'N/A'):.4f}")
else:
print("❌ No benchmark data found for summary")
def print_summary(results: dict) -> None:
"""Print summary of processing results."""
print("\n" + "=" * 50)
print("📊 PROCESSING SUMMARY")
print("=" * 50)
print(f"✅ Successfully processed: {len(results['processed'])}")
print(f"⏭️ Skipped (already exists): {len(results['skipped'])}")
print(f"❌ Failed: {len(results['failed'])}")
if results["processed"]:
print("\n📁 Processed benchmarks:")
for folder in results["processed"]:
print(f" - {folder}")
if results["skipped"]:
print("\n⏭️ Skipped benchmarks:")
for folder in results["skipped"]:
print(f" - {folder}")
def main():
"""Main orchestration function."""
VOLUME_NAME = "qa-benchmarks"
DOWNLOAD_PATH = "temp"
print("🚀 Starting cross-benchmark analysis")
print(f"📦 Modal volume: {VOLUME_NAME}")
print(f"📁 Download path: {DOWNLOAD_PATH}")
print("-" * 50)
# Download modal volume
download_modal_volume(VOLUME_NAME, DOWNLOAD_PATH)
# Process all benchmarks
results = process_all_benchmarks(DOWNLOAD_PATH)
# Create cross-benchmark summary
create_cross_benchmark_summary(DOWNLOAD_PATH, results)
# Print summary
print_summary(results)
print("\n🎉 Cross-benchmark analysis completed!")
if __name__ == "__main__":
main()