Skip to main content
Glama
brockwebb

Open Census MCP Server

by brockwebb
automated_eval_runner.py24.7 kB
#!/usr/bin/env python3 """ Automated Census MCP Evaluation Runner Uses Anthropic API to evaluate MCP performance against benchmark queries Usage: python automated_eval_runner.py --run baseline python automated_eval_runner.py --run experiment --compare 1 """ import json import sqlite3 import time import argparse import os from pathlib import Path from datetime import datetime from typing import Dict, List, Any, Optional import logging from anthropic import Anthropic logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class MCPEvaluator: def __init__(self, db_path: str = "evaluation.db"): self.db_path = Path(db_path) # Load API key from .env self.api_key = os.getenv("ANTHROPIC_API_KEY") if not self.api_key: raise ValueError("ANTHROPIC_API_KEY not found in environment") self.client = Anthropic(api_key=self.api_key) # Initialize database self._init_database() # Load benchmark queries self.benchmark_queries = self._load_benchmark_queries() def _init_database(self): """Initialize SQLite database with evaluation schema""" with sqlite3.connect(self.db_path) as conn: conn.executescript(''' CREATE TABLE IF NOT EXISTS evaluation_runs ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp TEXT NOT NULL, system_version TEXT NOT NULL, description TEXT, total_queries INTEGER, overall_score REAL, passed_queries INTEGER, execution_time_seconds REAL ); CREATE TABLE IF NOT EXISTS query_results ( id INTEGER PRIMARY KEY AUTOINCREMENT, run_id INTEGER, query_id TEXT NOT NULL, query_text TEXT NOT NULL, query_type TEXT NOT NULL, category TEXT NOT NULL, expected_result TEXT NOT NULL, full_response TEXT, mcp_calls_json TEXT, reasoning_chain TEXT, plan_quality REAL, tool_coordination REAL, adaptation REAL, reasoning_clarity REAL, correctness REAL, passed BOOLEAN, failure_reason TEXT, execution_time_ms INTEGER, FOREIGN KEY (run_id) REFERENCES evaluation_runs (id) ); CREATE INDEX IF NOT EXISTS idx_run_id ON query_results (run_id); CREATE INDEX IF NOT EXISTS idx_query_id ON query_results (query_id); ''') def _load_benchmark_queries(self) -> List[Dict]: """Load benchmark queries""" return [ {"id": "Q01", "type": "variable", "category": "core_demographics", "query": "total population", "truth": ["B01003_001E"]}, {"id": "Q02", "type": "variable", "category": "core_demographics", "query": "median household income", "truth": ["B19013_001E"]}, {"id": "Q03", "type": "variable", "category": "core_demographics", "query": "poverty count", "truth": ["B17001_002E"]}, {"id": "Q04", "type": "variable", "category": "core_demographics", "query": "race breakdown", "truth": ["B02001_002E", "B02001_003E", "B02001_004E"]}, {"id": "Q05", "type": "calculation", "category": "core_demographics", "query": "bachelor's degree or higher", "truth": ["B15003_022E", "B15003_023E", "B15003_024E", "B15003_025E"]}, {"id": "Q06", "type": "variable", "category": "core_demographics", "query": "without health insurance", "truth": ["B27001_017E"]}, {"id": "Q07", "type": "variable", "category": "core_demographics", "query": "median age", "truth": ["B01002_001E"]}, {"id": "Q08", "type": "variable", "category": "housing_transport", "query": "owner vs renter occupied", "truth": ["B25003_002E", "B25003_003E"]}, {"id": "Q09", "type": "variable", "category": "housing_transport", "query": "average commute time", "truth": ["B08303_001E"]}, {"id": "Q10", "type": "geographic", "category": "geographic_complexity", "query": "most educated state", "truth": "state_comparison_B15003"}, {"id": "Q11", "type": "disambiguation", "category": "geographic_complexity", "query": "Washington income", "truth": "clarification_needed"}, {"id": "Q12", "type": "caveat", "category": "edge_cases", "query": "teacher salary", "truth": "educator_aggregation_caveat"}, {"id": "Q13", "type": "limitation", "category": "confusing", "query": "population of 123 Main Street", "truth": "address_level_not_available"}, {"id": "Q14", "type": "limitation", "category": "confusing", "query": "undocumented immigrants in Dallas", "truth": "immigration_status_not_collected"}, {"id": "Q15", "type": "concept", "category": "confusing", "query": "people who are both Black and Hispanic", "truth": "race_ethnicity_separate_variables"}, {"id": "Q16", "type": "clarification", "category": "confusing", "query": "average mean income", "truth": "median_vs_mean_clarification"}, {"id": "Q17", "type": "temporal", "category": "confusing", "query": "unemployment rate for March 2025", "truth": "monthly_data_not_available"}, {"id": "Q18", "type": "limitation", "category": "confusing", "query": "people with Medicaid coverage", "truth": "public_insurance_aggregated"}, {"id": "Q19", "type": "temporal", "category": "confusing", "query": "population in 2010", "truth": "decennial_vs_acs_confusion"}, {"id": "Q20", "type": "limitation", "category": "confusing", "query": "commute by Uber or rideshare", "truth": "rideshare_not_specified"} ] def run_evaluation(self, run_name: str, description: str = "") -> int: """Run complete evaluation and return run_id""" logger.info(f"Starting evaluation run: {run_name}") logger.info(f"Testing {len(self.benchmark_queries)} queries") # Create run record run_id = self._create_run_record(run_name, description) start_time = time.time() results = [] for i, query in enumerate(self.benchmark_queries, 1): logger.info(f"Query {i}/{len(self.benchmark_queries)}: {query['query']}") query_start = time.time() result = self._evaluate_single_query(query, run_id) result["execution_time_ms"] = (time.time() - query_start) * 1000 self._save_query_result(result, run_id) results.append(result) # Log result status = "✅ PASS" if result["passed"] else "❌ FAIL" logger.info(f" {status} (correctness: {result['correctness']:.2f}, plan: {result['plan_quality']:.2f})") # Update run record with summary total_time = time.time() - start_time self._update_run_summary(run_id, results, total_time) return run_id def _create_run_record(self, system_version: str, description: str) -> int: """Create evaluation run record and return run_id""" with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute(''' INSERT INTO evaluation_runs (timestamp, system_version, description, total_queries) VALUES (?, ?, ?, ?) ''', (datetime.now().isoformat(), system_version, description, len(self.benchmark_queries))) return cursor.lastrowid def _evaluate_single_query(self, query: Dict, run_id: int) -> Dict: """Evaluate a single query using Anthropic API with MCP""" try: # Create the API call with MCP-specific prompt prompt = self._create_mcp_prompt(query["query"]) response = self.client.messages.create( model="claude-3-5-sonnet-20241022", max_tokens=2048, temperature=0.1, messages=[{"role": "user", "content": prompt}] ) # Extract response content response_text = response.content[0].text if response.content else "" # Parse MCP function calls (if any) mcp_calls = self._extract_mcp_calls(response_text) # Score the response using both traditional and behavioral metrics scores = self._score_response_comprehensive(response_text, query, mcp_calls) return { "query_id": query["id"], "query_text": query["query"], "query_type": query["type"], "category": query["category"], "expected_result": json.dumps(query["truth"]), "full_response": response_text, "mcp_calls_json": json.dumps(mcp_calls), "reasoning_chain": self._extract_reasoning_chain(response_text), **scores } except Exception as e: logger.error(f"Error evaluating query {query['id']}: {str(e)}") return self._create_error_result(query, str(e)) def _create_mcp_prompt(self, user_query: str) -> str: """Create prompt that encourages MCP tool usage""" return f"""You have access to Census MCP tools that can help answer demographic questions. Please use the appropriate Census tools to answer this question: "{user_query}" If the question involves demographic data, locations, or Census concepts, please use the Census MCP tools (get_demographic_data, compare_locations, or search_census_knowledge) to provide an accurate, well-sourced answer. For questions that can't be answered with Census data, please explain why and suggest appropriate alternatives.""" def _extract_mcp_calls(self, response_text: str) -> List[Dict]: """Extract MCP function calls from response text""" # Look for common MCP function patterns mcp_functions = ["get_demographic_data", "compare_locations", "search_census_knowledge"] calls = [] for func in mcp_functions: if func in response_text: calls.append({"function": func, "detected": True}) return calls def _extract_reasoning_chain(self, response_text: str) -> str: """Extract reasoning chain from response""" # Simple extraction - look for step-by-step reasoning lines = response_text.split('\n') reasoning_lines = [] for line in lines: line = line.strip() if any(indicator in line.lower() for indicator in ['first', 'then', 'next', 'step', 'because', 'therefore']): reasoning_lines.append(line) return ' | '.join(reasoning_lines[:5]) # First 5 reasoning steps def _score_response_comprehensive(self, response_text: str, query: Dict, mcp_calls: List[Dict]) -> Dict: """Score response across multiple dimensions""" # Traditional correctness scoring correctness = self._score_correctness(response_text, query) # Behavioral scoring (from agent evaluation paper) plan_quality = self._score_plan_quality(response_text, query, mcp_calls) tool_coordination = self._score_tool_coordination(mcp_calls, query) adaptation = self._score_adaptation(response_text, query) reasoning_clarity = self._score_reasoning_clarity(response_text) # Overall pass/fail passed = correctness >= 0.7 and plan_quality >= 0.6 failure_reason = "" if passed else self._get_failure_reason(correctness, plan_quality, query) return { "correctness": correctness, "plan_quality": plan_quality, "tool_coordination": tool_coordination, "adaptation": adaptation, "reasoning_clarity": reasoning_clarity, "passed": passed, "failure_reason": failure_reason } def _score_correctness(self, response_text: str, query: Dict) -> float: """Score traditional correctness""" query_type = query["type"] expected = query["truth"] response_lower = response_text.lower() if query_type == "variable": # Check if expected variable IDs are mentioned if isinstance(expected, list): found = sum(1 for var in expected if var.lower() in response_lower) return min(found / len(expected), 1.0) else: return 1.0 if str(expected).lower() in response_lower else 0.0 elif query_type == "limitation": # Check for limitation acknowledgment limitation_words = ["not available", "limitation", "cannot", "not collected"] return 1.0 if any(word in response_lower for word in limitation_words) else 0.0 elif query_type == "disambiguation": # Check for clarification request clarification_words = ["clarify", "which", "specify", "need more", "ambiguous"] return 1.0 if any(word in response_lower for word in clarification_words) else 0.0 elif query_type == "caveat": # Check for appropriate warning/caveat caveat_words = ["caveat", "limitation", "note that", "however", "but"] return 1.0 if any(word in response_lower for word in caveat_words) else 0.0 else: # Default scoring for other types return 0.8 if len(response_text) > 100 else 0.3 def _score_plan_quality(self, response_text: str, query: Dict, mcp_calls: List[Dict]) -> float: """Score quality of strategy/plan chosen""" query_type = query["type"] # Check if appropriate MCP tools were used if query_type in ["variable", "geographic", "calculation"]: # Should use demographic tools demographic_tools = ["get_demographic_data", "compare_locations"] if any(call["function"] in demographic_tools for call in mcp_calls): return 0.9 elif "search_census_knowledge" in [call["function"] for call in mcp_calls]: return 0.7 # Knowledge search is reasonable but not optimal else: return 0.3 # No appropriate tools used elif query_type in ["limitation", "caveat", "concept"]: # Should use knowledge search or explain without data if "search_census_knowledge" in [call["function"] for call in mcp_calls]: return 0.9 elif len(response_text) > 200: # Substantial explanation without tools return 0.7 else: return 0.4 else: # For other types, check if response is substantive return 0.8 if len(response_text) > 150 else 0.5 def _score_tool_coordination(self, mcp_calls: List[Dict], query: Dict) -> float: """Score how well tools were coordinated""" if not mcp_calls: return 0.0 # Simple coordination scoring if len(mcp_calls) == 1: return 0.8 # Single appropriate tool elif len(mcp_calls) > 1: return 0.9 # Multiple tools suggest good coordination else: return 0.0 def _score_adaptation(self, response_text: str, query: Dict) -> float: """Score adaptation to query challenges""" response_lower = response_text.lower() # Check for adaptive language adaptive_phrases = [ "instead", "alternative", "however", "but", "although", "different approach", "another way", "let me try" ] adaptation_score = sum(1 for phrase in adaptive_phrases if phrase in response_lower) return min(adaptation_score / 3.0, 1.0) # Max score with 3+ adaptive phrases def _score_reasoning_clarity(self, response_text: str) -> float: """Score clarity of reasoning chain""" # Simple heuristics for reasoning clarity reasoning_indicators = [ "because", "therefore", "first", "then", "next", "finally", "this means", "as a result", "consequently" ] response_lower = response_text.lower() reasoning_count = sum(1 for indicator in reasoning_indicators if indicator in response_lower) # Score based on structure and reasoning indicators structure_score = 0.3 if len(response_text) > 200 else 0.1 reasoning_score = min(reasoning_count / 5.0, 0.7) # Max 0.7 for reasoning indicators return structure_score + reasoning_score def _get_failure_reason(self, correctness: float, plan_quality: float, query: Dict) -> str: """Generate failure reason description""" reasons = [] if correctness < 0.7: reasons.append(f"Low correctness ({correctness:.2f})") if plan_quality < 0.6: reasons.append(f"Poor plan quality ({plan_quality:.2f})") return "; ".join(reasons) if reasons else "Unknown failure" def _create_error_result(self, query: Dict, error_msg: str) -> Dict: """Create error result record""" return { "query_id": query["id"], "query_text": query["query"], "query_type": query["type"], "category": query["category"], "expected_result": json.dumps(query["truth"]), "full_response": f"ERROR: {error_msg}", "mcp_calls_json": "[]", "reasoning_chain": "", "correctness": 0.0, "plan_quality": 0.0, "tool_coordination": 0.0, "adaptation": 0.0, "reasoning_clarity": 0.0, "passed": False, "failure_reason": f"System error: {error_msg}" } def _save_query_result(self, result: Dict, run_id: int): """Save query result to database""" with sqlite3.connect(self.db_path) as conn: conn.execute(''' INSERT INTO query_results ( run_id, query_id, query_text, query_type, category, expected_result, full_response, mcp_calls_json, reasoning_chain, plan_quality, tool_coordination, adaptation, reasoning_clarity, correctness, passed, failure_reason, execution_time_ms ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ''', ( run_id, result["query_id"], result["query_text"], result["query_type"], result["category"], result["expected_result"], result["full_response"], result["mcp_calls_json"], result["reasoning_chain"], result["plan_quality"], result["tool_coordination"], result["adaptation"], result["reasoning_clarity"], result["correctness"], result["passed"], result["failure_reason"], result["execution_time_ms"] )) def _update_run_summary(self, run_id: int, results: List[Dict], total_time: float): """Update run record with summary statistics""" passed_count = sum(1 for r in results if r["passed"]) overall_score = passed_count / len(results) if results else 0 with sqlite3.connect(self.db_path) as conn: conn.execute(''' UPDATE evaluation_runs SET overall_score = ?, passed_queries = ?, execution_time_seconds = ? WHERE id = ? ''', (overall_score, passed_count, total_time, run_id)) def print_run_summary(self, run_id: int): """Print summary of evaluation run""" with sqlite3.connect(self.db_path) as conn: # Get run info run_info = conn.execute( "SELECT * FROM evaluation_runs WHERE id = ?", (run_id,) ).fetchone() if not run_info: print(f"Run {run_id} not found") return # Get detailed results results = conn.execute( "SELECT * FROM query_results WHERE run_id = ?", (run_id,) ).fetchall() print(f"\n{'='*60}") print(f"EVALUATION RESULTS - {run_info[2]} ({run_info[1]})") print(f"{'='*60}") print(f"Overall Score: {run_info[5]:.1%} ({run_info[6]}/{run_info[4]})") print(f"Execution Time: {run_info[7]:.1f} seconds") # Category breakdown category_stats = {} for result in results: category = result[5] # category column if category not in category_stats: category_stats[category] = {"total": 0, "passed": 0} category_stats[category]["total"] += 1 if result[15]: # passed column category_stats[category]["passed"] += 1 print(f"\nBy Category:") for category, stats in category_stats.items(): pct = stats["passed"] / stats["total"] * 100 if stats["total"] > 0 else 0 print(f" {category}: {pct:.1f}% ({stats['passed']}/{stats['total']})") # Average behavioral scores if results: avg_plan = sum(r[9] for r in results) / len(results) # plan_quality avg_tool = sum(r[10] for r in results) / len(results) # tool_coordination avg_adapt = sum(r[11] for r in results) / len(results) # adaptation avg_reasoning = sum(r[12] for r in results) / len(results) # reasoning_clarity print(f"\nBehavioral Scores (Avg):") print(f" Plan Quality: {avg_plan:.2f}") print(f" Tool Coordination: {avg_tool:.2f}") print(f" Adaptation: {avg_adapt:.2f}") print(f" Reasoning Clarity: {avg_reasoning:.2f}") def compare_runs(self, run1_id: int, run2_id: int): """Compare two evaluation runs""" with sqlite3.connect(self.db_path) as conn: run1 = conn.execute("SELECT * FROM evaluation_runs WHERE id = ?", (run1_id,)).fetchone() run2 = conn.execute("SELECT * FROM evaluation_runs WHERE id = ?", (run2_id,)).fetchone() if not run1 or not run2: print("One or both runs not found") return print(f"\n{'='*60}") print(f"COMPARISON: {run1[2]} vs {run2[2]}") print(f"{'='*60}") print(f"{run1[2]}: {run1[5]:.1%} ({run1[6]}/{run1[4]})") print(f"{run2[2]}: {run2[5]:.1%} ({run2[6]}/{run2[4]})") improvement = run2[5] - run1[5] print(f"Improvement: {improvement:+.1%}") print(f"Result: {'✅ IMPROVED' if improvement > 0 else '❌ REGRESSED' if improvement < 0 else '➡️ NO CHANGE'}") def main(): parser = argparse.ArgumentParser(description="Run Census MCP automated evaluation") parser.add_argument("--run", required=True, help="Run name/version (e.g., 'baseline', 'v2.1')") parser.add_argument("--description", default="", help="Run description") parser.add_argument("--compare", help="Compare with previous run ID") args = parser.parse_args() # Load environment variables from dotenv import load_dotenv load_dotenv() # Run evaluation evaluator = MCPEvaluator() run_id = evaluator.run_evaluation(args.run, args.description) # Print results evaluator.print_run_summary(run_id) # Compare if requested if args.compare: try: baseline_id = int(args.compare) evaluator.compare_runs(baseline_id, run_id) except ValueError: print(f"Invalid comparison run ID: {args.compare}") print(f"\nResults saved to database: {evaluator.db_path}") print(f"Run ID: {run_id}") if __name__ == "__main__": main()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/brockwebb/open-census-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server