"""
Script to run evaluation for all available searchers and save results.
This script iterates through all searcher types, runs evaluation for each,
and saves results to separate JSON files.
"""
from __future__ import annotations
import argparse
import json
import sys
from datetime import datetime
from pathlib import Path
from typing import Optional
from src.core import SearcherType
from .run_eval import (
AggregatedMetrics,
AggregatedStabilityMetrics,
main,
main_with_stability,
)
def sanitize_filename(name: str) -> str:
"""Convert searcher type name to safe filename."""
return name.replace("/", "_").replace("\\", "_").replace(" ", "_")
def run_evaluation_for_searcher(
searcher_type: SearcherType,
dataset_path: Optional[Path] = None,
dataset_easy_path: Optional[Path] = None,
repos_base_path: Optional[Path] = None,
output_dir: Path = Path("eval_results"),
stability: bool = False,
num_runs: int = 10,
max_workers: int = 10,
verbose: bool = False,
use_both_datasets: bool = True,
) -> tuple[Path, AggregatedMetrics | AggregatedStabilityMetrics]:
"""
Run evaluation for a specific searcher type.
Returns:
Tuple of (output_file_path, results)
"""
import os
# Set environment variable to use this searcher
original_searcher = os.environ.get("SEMANTIC_SEARCHER")
os.environ["SEMANTIC_SEARCHER"] = searcher_type.value
try:
# Create output directory
output_dir.mkdir(parents=True, exist_ok=True)
# Generate output filename
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
searcher_name = sanitize_filename(searcher_type.value)
suffix = "_stability" if stability else ""
output_file = output_dir / f"{searcher_name}{suffix}_{timestamp}.json"
print(f"\n{'='*80}")
print(f"Evaluating searcher: {searcher_type.value}")
print(f"{'='*80}")
# Run evaluation
if stability:
result = main_with_stability(
dataset_path=dataset_path,
dataset_easy_path=dataset_easy_path,
repos_base_path=repos_base_path,
output_json=output_file,
verbose=verbose,
num_runs=num_runs,
max_workers=max_workers,
use_both_datasets=use_both_datasets,
)
else:
result = main(
dataset_path=dataset_path,
dataset_easy_path=dataset_easy_path,
repos_base_path=repos_base_path,
output_json=output_file,
verbose=verbose,
use_both_datasets=use_both_datasets,
)
print(f"\n✓ Results saved to: {output_file}")
return output_file, result
finally:
# Restore original environment variable
if original_searcher is not None:
os.environ["SEMANTIC_SEARCHER"] = original_searcher
elif "SEMANTIC_SEARCHER" in os.environ:
del os.environ["SEMANTIC_SEARCHER"]
def create_summary_report(
results: dict[str, tuple[Path, AggregatedMetrics | AggregatedStabilityMetrics]],
output_dir: Path,
stability: bool = False,
) -> Path:
"""Create a summary report comparing all searchers."""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
suffix = "_stability" if stability else ""
summary_file = output_dir / f"summary{suffix}_{timestamp}.json"
summary = {
"timestamp": timestamp,
"stability_mode": stability,
"searchers": {},
}
for searcher_name, (output_path, result) in results.items():
if stability and isinstance(result, AggregatedStabilityMetrics):
summary["searchers"][searcher_name] = {
"output_file": str(output_path),
"num_queries": result.num_queries,
"num_runs_per_query": result.num_runs_per_query,
"metrics": {
"precision": {
"mean": result.precision.mean,
"std": result.precision.std,
"cv": result.precision.cv,
},
"recall": {
"mean": result.recall.mean,
"std": result.recall.std,
"cv": result.recall.cv,
},
"f1": {
"mean": result.f1.mean,
"std": result.f1.std,
"cv": result.f1.cv,
},
"success_rate": {
"mean": result.success_rate.mean,
"std": result.success_rate.std,
"cv": result.success_rate.cv,
},
"file_discovery_rate": {
"mean": result.file_discovery_rate.mean,
"std": result.file_discovery_rate.std,
"cv": result.file_discovery_rate.cv,
},
"substring_coverage": {
"mean": result.substring_coverage.mean,
"std": result.substring_coverage.std,
"cv": result.substring_coverage.cv,
},
},
"stability": {
"avg_stability_score": result.avg_stability_score,
"stable_queries_count": result.stable_queries_count,
},
"latency": {
"mean_ms": result.execution_time.mean,
"std_ms": result.execution_time.std,
"cv": result.execution_time.cv,
},
}
elif not stability and isinstance(result, AggregatedMetrics):
summary["searchers"][searcher_name] = {
"output_file": str(output_path),
"num_queries": result.num_queries,
"metrics": {
"precision": result.macro_precision,
"recall": result.macro_recall,
"f1": result.macro_f1,
"success_rate": result.success_rate,
"file_discovery_rate": result.avg_file_discovery_rate,
"substring_coverage": result.avg_substring_coverage,
},
"latency": {
"avg_ms": result.avg_time_ms,
"min_ms": result.min_time_ms,
"max_ms": result.max_time_ms,
"queries_under_10s": result.queries_meeting_latency_target,
},
}
# Add ranking
if stability:
# Rank by F1 mean
ranked = sorted(
summary["searchers"].items(),
key=lambda x: x[1]["metrics"]["f1"]["mean"],
reverse=True,
)
else:
ranked = sorted(
summary["searchers"].items(),
key=lambda x: x[1]["metrics"]["f1"],
reverse=True,
)
summary["ranking"] = [name for name, _ in ranked]
with summary_file.open("w", encoding="utf-8") as f:
json.dump(summary, f, indent=2)
print(f"\n{'='*80}")
print(f"SUMMARY REPORT")
print(f"{'='*80}")
print(f"Results saved to: {summary_file}")
print(f"\nRanking by F1 score:")
for i, searcher_name in enumerate(ranked[:10], 1): # Top 10
data = summary["searchers"][searcher_name]
if stability:
f1 = data["metrics"]["f1"]["mean"]
f1_std = data["metrics"]["f1"]["std"]
print(f" {i:2d}. {searcher_name:30s} F1={f1:.4f}±{f1_std:.4f}")
else:
f1 = data["metrics"]["f1"]
print(f" {i:2d}. {searcher_name:30s} F1={f1:.4f}")
return summary_file
def get_gemini_flash_searchers() -> list[SearcherType]:
"""Get list of searchers that use Gemini Flash."""
return [
# SearcherType.AGENT_GEMINI_FLASH,
SearcherType.SGR_GEMINI_FLASH_LITE,
]
def main_all_searchers(
searchers: Optional[list[str]] = None,
exclude_searchers: Optional[list[str]] = None,
dataset_path: Optional[Path] = None,
dataset_easy_path: Optional[Path] = None,
repos_base_path: Optional[Path] = None,
output_dir: Path = Path("eval_results"),
stability: bool = False,
num_runs: int = 10,
max_workers: int = 10,
verbose: bool = False,
use_both_datasets: bool = True,
skip_failed: bool = True,
all_searchers: bool = False,
) -> None:
"""
Run evaluation for all or specified searchers.
Args:
searchers: List of searcher type names to evaluate. If None, evaluates Gemini Flash searchers by default.
exclude_searchers: List of searcher type names to skip.
dataset_path: Path to main dataset
dataset_easy_path: Path to easy dataset
repos_base_path: Base path for repositories
output_dir: Directory to save results
stability: Run stability evaluation (multiple runs per query)
num_runs: Number of runs per query in stability mode
max_workers: Maximum parallel workers
verbose: Print detailed statistics
use_both_datasets: Use both main and easy datasets
skip_failed: Skip searchers that fail to initialize
all_searchers: If True, evaluate all searchers instead of just Gemini Flash ones
"""
# Get all searcher types
all_searcher_types = list(SearcherType)
# Filter searchers
if searchers:
searcher_types = [
st for st in all_searcher_types
if st.value in searchers
]
if not searcher_types:
print(f"Error: No valid searchers found in: {searchers}")
print(f"Available searchers: {[st.value for st in all_searcher_types]}")
sys.exit(1)
elif all_searchers:
searcher_types = all_searcher_types
else:
# Default: only Gemini Flash searchers
searcher_types = get_gemini_flash_searchers()
if exclude_searchers:
searcher_types = [
st for st in searcher_types
if st.value not in exclude_searchers
]
print(f"\n{'='*80}")
if all_searchers or searchers:
print(f"EVALUATION FOR SEARCHERS")
else:
print(f"EVALUATION FOR GEMINI FLASH SEARCHERS")
print(f"{'='*80}")
print(f"Total searchers to evaluate: {len(searcher_types)}")
if not all_searchers and not searchers:
print(f" (Using only Gemini Flash searchers by default)")
print(f" (Use --all to evaluate all searchers)")
print(f"Searchers: {', '.join(st.value for st in searcher_types)}")
print(f"Output directory: {output_dir.absolute()}")
print(f"Stability mode: {stability}")
if stability:
print(f"Runs per query: {num_runs}")
print(f"{'='*80}\n")
results: dict[str, tuple[Path, AggregatedMetrics | AggregatedStabilityMetrics]] = {}
failed_searchers: list[str] = []
for i, searcher_type in enumerate(searcher_types, 1):
print(f"\n[{i}/{len(searcher_types)}] Processing: {searcher_type.value}")
try:
output_file, result = run_evaluation_for_searcher(
searcher_type=searcher_type,
dataset_path=dataset_path,
dataset_easy_path=dataset_easy_path,
repos_base_path=repos_base_path,
output_dir=output_dir,
stability=stability,
num_runs=num_runs,
max_workers=max_workers,
verbose=verbose,
use_both_datasets=use_both_datasets,
)
results[searcher_type.value] = (output_file, result)
except Exception as e:
error_msg = f"Failed to evaluate {searcher_type.value}: {e}"
print(f"\n❌ {error_msg}")
if not skip_failed:
raise
failed_searchers.append(searcher_type.value)
# Create summary report
if results:
summary_file = create_summary_report(results, output_dir, stability)
print(f"\n✓ Summary report: {summary_file}")
else:
print("\n❌ No searchers were successfully evaluated!")
sys.exit(1)
# Report failures
if failed_searchers:
print(f"\n⚠ Failed searchers ({len(failed_searchers)}):")
for name in failed_searchers:
print(f" - {name}")
print(f"\n{'='*80}")
print(f"EVALUATION COMPLETE")
print(f"{'='*80}")
print(f"Successfully evaluated: {len(results)}/{len(searcher_types)} searchers")
print(f"Results directory: {output_dir.absolute()}")
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Run evaluation for all available searchers"
)
parser.add_argument(
"--searchers",
nargs="+",
help="Specific searcher types to evaluate (default: Gemini Flash searchers only)",
)
parser.add_argument(
"--all",
action="store_true",
help="Evaluate all searchers instead of just Gemini Flash ones",
)
parser.add_argument(
"--exclude",
nargs="+",
help="Searcher types to exclude",
)
parser.add_argument(
"--dataset",
type=Path,
help="Path to main JSONL dataset file",
)
parser.add_argument(
"--dataset-easy",
type=Path,
help="Path to easy JSONL dataset file",
)
parser.add_argument(
"--repos",
type=Path,
help="Base path for evaluation repositories",
)
parser.add_argument(
"--output-dir",
type=Path,
default=Path("eval_results"),
help="Directory to save results (default: eval_results)",
)
parser.add_argument(
"--stability",
action="store_true",
help="Run stability evaluation (multiple runs per query)",
)
parser.add_argument(
"--runs",
type=int,
default=10,
help="Number of runs per query in stability mode (default: 10)",
)
parser.add_argument(
"--workers",
type=int,
default=10,
help="Maximum parallel workers for stability runs (default: 10)",
)
parser.add_argument(
"-v", "--verbose",
action="store_true",
help="Print detailed per-query statistics",
)
parser.add_argument(
"--single-dataset",
action="store_true",
help="Use only the main dataset",
)
parser.add_argument(
"--no-skip-failed",
action="store_true",
help="Stop on first failed searcher instead of skipping",
)
args = parser.parse_args()
main_all_searchers(
searchers=args.searchers,
exclude_searchers=args.exclude,
dataset_path=args.dataset,
dataset_easy_path=args.dataset_easy,
repos_base_path=args.repos,
output_dir=args.output_dir,
stability=args.stability,
num_runs=args.runs,
max_workers=args.workers,
verbose=args.verbose,
use_both_datasets=not args.single_dataset,
skip_failed=not args.no_skip_failed,
all_searchers=args.all,
)