Skip to main content
Glama

ToGMAL MCP Server

postprocess_benchmark_data.py11.8 kB
#!/usr/bin/env python3 """ Post-Process Benchmark Data ============================ Strategy: 1. Load raw benchmark results 2. Stratify by difficulty tier (low/medium/high success) 3. Select balanced sample for vector DB: - 30% LOW success (0-30%): Hard questions - model limitations - 40% MEDIUM success (30-70%): Capability boundary - most interesting - 30% HIGH success (70-100%): Within capability - baseline 4. Export stratified sample for vector DB indexing This ensures we have good coverage across the capability spectrum. """ import json import logging from pathlib import Path from typing import Dict, List, Any from collections import defaultdict import random logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) class BenchmarkDataPostProcessor: """Post-process raw benchmark data for vector DB""" def __init__(self, input_file: Path = Path("./data/benchmark_results/raw_benchmark_results.json")): self.input_file = input_file self.questions = {} self.stratified_sample = {} def load_raw_data(self): """Load raw benchmark results""" logger.info(f"Loading raw data from {self.input_file}...") with open(self.input_file, 'r') as f: data = json.load(f) self.questions = data['questions'] logger.info(f"Loaded {len(self.questions)} questions") return self.questions def analyze_difficulty_distribution(self) -> Dict[str, Any]: """Analyze distribution across difficulty tiers""" logger.info("Analyzing difficulty distribution...") distribution = { "low": [], # 0-30% success "medium": [], # 30-70% success "high": [] # 70-100% success } for qid, q in self.questions.items(): tier = q.get('difficulty_tier') if tier and tier in distribution: distribution[tier].append(qid) stats = { "total_questions": len(self.questions), "low_success_count": len(distribution["low"]), "medium_success_count": len(distribution["medium"]), "high_success_count": len(distribution["high"]), "low_success_pct": len(distribution["low"]) / len(self.questions) * 100, "medium_success_pct": len(distribution["medium"]) / len(self.questions) * 100, "high_success_pct": len(distribution["high"]) / len(self.questions) * 100 } logger.info(f" LOW success (0-30%): {stats['low_success_count']} ({stats['low_success_pct']:.1f}%)") logger.info(f" MEDIUM success (30-70%): {stats['medium_success_count']} ({stats['medium_success_pct']:.1f}%)") logger.info(f" HIGH success (70-100%): {stats['high_success_count']} ({stats['high_success_pct']:.1f}%)") return distribution, stats def stratified_sampling( self, target_size: int = 1000, low_pct: float = 0.30, medium_pct: float = 0.40, high_pct: float = 0.30 ) -> Dict[str, Any]: """ Create stratified sample with balanced difficulty distribution. Args: target_size: Total number of questions to sample low_pct: Percentage of LOW success questions (0-30% success) medium_pct: Percentage of MEDIUM success questions (30-70%) high_pct: Percentage of HIGH success questions (70-100%) """ logger.info(f"Creating stratified sample (target: {target_size} questions)...") logger.info(f" Target distribution: {low_pct*100:.0f}% low, {medium_pct*100:.0f}% medium, {high_pct*100:.0f}% high") distribution, _ = self.analyze_difficulty_distribution() # Calculate target counts per tier target_counts = { "low": int(target_size * low_pct), "medium": int(target_size * medium_pct), "high": int(target_size * high_pct) } sampled = {} random.seed(42) # Reproducibility for tier, target_count in target_counts.items(): available = distribution[tier] if len(available) >= target_count: # Sample from available selected = random.sample(available, target_count) else: # Take all available selected = available logger.warning(f" Only {len(available)} {tier} questions available (target: {target_count})") for qid in selected: sampled[qid] = self.questions[qid] logger.info(f" Sampled {len(selected)} {tier} success questions") self.stratified_sample = sampled logger.info(f"Total sampled: {len(sampled)} questions") return sampled def export_for_vector_db(self, output_file: Path = Path("./data/benchmark_results/stratified_sample.json")): """Export stratified sample in format ready for vector DB""" logger.info(f"Exporting stratified sample to {output_file}...") # Create output format export_data = { "metadata": { "total_questions": len(self.stratified_sample), "sampling_strategy": "stratified_by_difficulty", "tiers": { "low": "0-30% success rate", "medium": "30-70% success rate", "high": "70-100% success rate" } }, "questions": [] } # Group by tier for summary tier_counts = defaultdict(int) benchmark_counts = defaultdict(int) for qid, q in self.stratified_sample.items(): tier_counts[q.get('difficulty_tier', 'unknown')] += 1 benchmark_counts[q.get('source_benchmark', 'unknown')] += 1 # Simplify for export export_q = { "question_id": qid, "source_benchmark": q['source_benchmark'], "domain": q['domain'], "question_text": q['question_text'], "correct_answer": q['correct_answer'], "choices": q.get('choices'), "success_rate": q.get('success_rate'), "difficulty_tier": q.get('difficulty_tier'), "difficulty_label": q.get('difficulty_label'), "num_models_tested": q.get('num_models', 0) } export_data["questions"].append(export_q) export_data["metadata"]["distribution"] = { "by_tier": dict(tier_counts), "by_benchmark": dict(benchmark_counts) } # Save output_file.parent.mkdir(parents=True, exist_ok=True) with open(output_file, 'w') as f: json.dump(export_data, f, indent=2) logger.info(f"✓ Exported {len(export_data['questions'])} questions") logger.info(f" By tier: {dict(tier_counts)}") logger.info(f" By benchmark: {dict(benchmark_counts)}") return output_file def generate_summary_report(self) -> str: """Generate markdown summary report""" report = ["# Benchmark Data Post-Processing Report\n"] # Overall stats report.append("## Overall Statistics\n") report.append(f"- **Total questions collected**: {len(self.questions)}") report.append(f"- **Stratified sample size**: {len(self.stratified_sample)}\n") # Difficulty distribution report.append("## Difficulty Distribution\n") tier_counts = defaultdict(int) for q in self.stratified_sample.values(): tier_counts[q.get('difficulty_tier', 'unknown')] += 1 report.append("| Tier | Count | Percentage | Description |") report.append("|------|-------|------------|-------------|") total = len(self.stratified_sample) for tier in ['low', 'medium', 'high']: count = tier_counts[tier] pct = count / total * 100 if total > 0 else 0 desc = { 'low': 'Hard - model limitations (0-30% success)', 'medium': 'Capability boundary (30-70% success)', 'high': 'Within capability (70-100% success)' }[tier] report.append(f"| {tier.upper()} | {count} | {pct:.1f}% | {desc} |") report.append("\n") # Benchmark distribution report.append("## Source Benchmark Distribution\n") benchmark_counts = defaultdict(int) for q in self.stratified_sample.values(): benchmark_counts[q.get('source_benchmark', 'unknown')] += 1 report.append("| Benchmark | Count | Percentage |") report.append("|-----------|-------|------------|") for benchmark, count in sorted(benchmark_counts.items()): pct = count / total * 100 if total > 0 else 0 report.append(f"| {benchmark} | {count} | {pct:.1f}% |") report.append("\n") # Success rate stats report.append("## Success Rate Statistics\n") success_rates = [q.get('success_rate', 0) for q in self.stratified_sample.values() if q.get('success_rate') is not None] if success_rates: import numpy as np report.append(f"- **Min**: {np.min(success_rates):.1%}") report.append(f"- **Max**: {np.max(success_rates):.1%}") report.append(f"- **Mean**: {np.mean(success_rates):.1%}") report.append(f"- **Median**: {np.median(success_rates):.1%}\n") # Next steps report.append("## Next Steps\n") report.append("1. Load stratified sample into vector database") report.append("2. Generate embeddings for all questions") report.append("3. Test difficulty assessment on real prompts") report.append("4. Validate accuracy against known hard/easy questions\n") return "\n".join(report) def save_summary_report(self, output_file: Path = Path("./data/benchmark_results/PROCESSING_REPORT.md")): """Save summary report""" report = self.generate_summary_report() with open(output_file, 'w') as f: f.write(report) logger.info(f"Saved summary report to {output_file}") return output_file def main(): """Main execution""" logger.info("="*80) logger.info("Post-Processing Benchmark Data") logger.info("="*80) # Initialize processor = BenchmarkDataPostProcessor() # Load raw data processor.load_raw_data() # Analyze distribution processor.analyze_difficulty_distribution() # Create stratified sample # Target: 1000 questions with 30% low, 40% medium, 30% high processor.stratified_sampling( target_size=1000, low_pct=0.30, medium_pct=0.40, high_pct=0.30 ) # Export for vector DB export_path = processor.export_for_vector_db() # Generate summary report report_path = processor.save_summary_report() # Print summary print("\n" + processor.generate_summary_report()) print("="*80) print("✓ Post-processing complete!") print("="*80) print(f"\nOutput files:") print(f" - Stratified sample: {export_path}") print(f" - Summary report: {report_path}") print(f"\nNext: Run vector DB builder with stratified sample") if __name__ == "__main__": main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/HeTalksInMaths/togmal-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server