Skip to main content
Glama

FedMCP - Federal Parliamentary Information

test_postgresql_neo4j_validation.py15.4 kB
"""Comprehensive validation script comparing PostgreSQL and Neo4j data. This script validates that data has been correctly migrated from the OpenParliament PostgreSQL database to the Neo4j graph database. It checks: 1. Record counts match between PostgreSQL and Neo4j 2. Relationships are properly created 3. No orphaned data exists 4. Data integrity constraints are met """ import sys import os from pathlib import Path from dotenv import load_dotenv from typing import Dict, List, Tuple # Add packages to path sys.path.insert(0, str(Path(__file__).parent / "packages/data-pipeline")) from fedmcp_pipeline.utils.neo4j_client import Neo4jClient from fedmcp_pipeline.utils.postgres_client import PostgresClient # Load environment load_dotenv(Path(__file__).parent / "packages/data-pipeline/.env") # ANSI color codes GREEN = "\033[92m" RED = "\033[91m" YELLOW = "\033[93m" BLUE = "\033[94m" RESET = "\033[0m" class ValidationResult: """Stores validation results for a single check.""" def __init__(self, name: str, passed: bool, message: str, details: Dict = None): self.name = name self.passed = passed self.message = message self.details = details or {} class DataValidator: """Validates data consistency between PostgreSQL and Neo4j.""" def __init__(self, postgres_client: PostgresClient, neo4j_client: Neo4jClient): self.postgres = postgres_client self.neo4j = neo4j_client self.results: List[ValidationResult] = [] def validate_count(self, name: str, pg_query: str, neo4j_query: str, tolerance: int = 0) -> ValidationResult: """Compare counts between PostgreSQL and Neo4j. Args: name: Name of the validation check pg_query: PostgreSQL COUNT query neo4j_query: Neo4j count query tolerance: Acceptable difference (for eventual consistency) """ try: # Get PostgreSQL count pg_result = self.postgres.execute_query(pg_query) pg_count = pg_result[0]['count'] if pg_result else 0 # Get Neo4j count neo4j_result = self.neo4j.run_query(neo4j_query) neo4j_count = neo4j_result[0]['count'] if neo4j_result else 0 # Check if counts match within tolerance diff = abs(pg_count - neo4j_count) passed = diff <= tolerance message = f"PostgreSQL: {pg_count:,} | Neo4j: {neo4j_count:,}" if diff > 0: message += f" | Difference: {diff:,}" return ValidationResult( name=name, passed=passed, message=message, details={ 'postgresql_count': pg_count, 'neo4j_count': neo4j_count, 'difference': diff } ) except Exception as e: return ValidationResult( name=name, passed=False, message=f"Error: {str(e)}", details={'error': str(e)} ) def validate_relationship_coverage(self, name: str, neo4j_node_query: str, neo4j_relationship_query: str, min_coverage_percent: float = 95.0) -> ValidationResult: """Validate that most nodes have expected relationships. Args: name: Name of the validation check neo4j_node_query: Query to count total nodes neo4j_relationship_query: Query to count nodes with relationships min_coverage_percent: Minimum acceptable coverage percentage """ try: # Get total nodes total_result = self.neo4j.run_query(neo4j_node_query) total = total_result[0]['count'] if total_result else 0 # Get nodes with relationships with_rel_result = self.neo4j.run_query(neo4j_relationship_query) with_rel = with_rel_result[0]['count'] if with_rel_result else 0 if total == 0: return ValidationResult( name=name, passed=True, message="No nodes to validate", details={'total': 0, 'with_relationships': 0, 'coverage': 0} ) coverage = (with_rel / total) * 100 passed = coverage >= min_coverage_percent message = f"{with_rel:,} / {total:,} ({coverage:.1f}%)" if not passed: message += f" - Below {min_coverage_percent}% threshold" return ValidationResult( name=name, passed=passed, message=message, details={ 'total': total, 'with_relationships': with_rel, 'coverage': coverage, 'min_coverage': min_coverage_percent } ) except Exception as e: return ValidationResult( name=name, passed=False, message=f"Error: {str(e)}", details={'error': str(e)} ) def validate_orphans(self, name: str, neo4j_query: str, max_orphans: int = 0) -> ValidationResult: """Check for orphaned data (nodes without expected relationships). Args: name: Name of the validation check neo4j_query: Query to count orphaned nodes max_orphans: Maximum acceptable number of orphans """ try: result = self.neo4j.run_query(neo4j_query) orphan_count = result[0]['count'] if result else 0 passed = orphan_count <= max_orphans message = f"Found {orphan_count:,} orphaned nodes" if orphan_count > max_orphans: message += f" - Exceeds {max_orphans} threshold" return ValidationResult( name=name, passed=passed, message=message, details={'orphan_count': orphan_count, 'max_orphans': max_orphans} ) except Exception as e: return ValidationResult( name=name, passed=False, message=f"Error: {str(e)}", details={'error': str(e)} ) def run_all_validations(self): """Run all validation checks.""" print("=" * 80) print(f"{BLUE}POSTGRESQL ↔ NEO4J DATA VALIDATION{RESET}") print("=" * 80) # 1. Hansard Statements print(f"\n{BLUE}1. HANSARD STATEMENTS{RESET}") print("-" * 80) result = self.validate_count( name="Hansard Statement Count", pg_query="SELECT COUNT(*) as count FROM hansards_statement", neo4j_query="MATCH (s:Statement) RETURN count(s) as count" ) self.results.append(result) self._print_result(result) result = self.validate_count( name="Hansard Document Count", pg_query="SELECT COUNT(*) as count FROM hansards_document", neo4j_query="MATCH (d:Document) RETURN count(d) as count" ) self.results.append(result) self._print_result(result) result = self.validate_relationship_coverage( name="Statements with PART_OF relationship", neo4j_node_query="MATCH (s:Statement) RETURN count(s) as count", neo4j_relationship_query=""" MATCH (s:Statement)-[:PART_OF]->() RETURN count(DISTINCT s) as count """, min_coverage_percent=99.0 ) self.results.append(result) self._print_result(result) result = self.validate_orphans( name="Statements without documents", neo4j_query=""" MATCH (s:Statement) WHERE NOT exists((s)-[:PART_OF]->()) RETURN count(s) as count """, max_orphans=100 ) self.results.append(result) self._print_result(result) # 2. Bill Texts print(f"\n{BLUE}2. BILL TEXTS{RESET}") print("-" * 80) result = self.validate_count( name="Bill Text Count", pg_query="SELECT COUNT(*) as count FROM bills_billtext", neo4j_query="MATCH (bt:BillText) RETURN count(bt) as count" ) self.results.append(result) self._print_result(result) result = self.validate_relationship_coverage( name="BillTexts with HAS_TEXT relationship", neo4j_node_query="MATCH (bt:BillText) RETURN count(bt) as count", neo4j_relationship_query=""" MATCH ()-[:HAS_TEXT]->(bt:BillText) RETURN count(DISTINCT bt) as count """, min_coverage_percent=95.0 ) self.results.append(result) self._print_result(result) # 3. Politician Info print(f"\n{BLUE}3. POLITICIAN INFO{RESET}") print("-" * 80) result = self.validate_count( name="Unique Politicians in PostgreSQL", pg_query="SELECT COUNT(DISTINCT politician_id) as count FROM core_politicianinfo", neo4j_query=""" MATCH (p:Politician) WHERE p.email IS NOT NULL OR p.phone IS NOT NULL OR p.twitter_id IS NOT NULL RETURN count(p) as count """ ) self.results.append(result) self._print_result(result) # Check specific info fields were added result = ValidationResult( name="Politicians with enriched data", passed=True, message="Checking email, phone, twitter fields...", details={} ) enriched_stats = self.neo4j.run_query(""" MATCH (p:Politician) RETURN count(CASE WHEN p.email IS NOT NULL THEN 1 END) as has_email, count(CASE WHEN p.phone IS NOT NULL THEN 1 END) as has_phone, count(CASE WHEN p.twitter_id IS NOT NULL THEN 1 END) as has_twitter, count(CASE WHEN p.parl_id IS NOT NULL THEN 1 END) as has_parl_id, count(p) as total """) if enriched_stats: stats = enriched_stats[0] result.details = stats result.message = f"Email: {stats['has_email']:,} | Phone: {stats['has_phone']:,} | Twitter: {stats['has_twitter']:,}" self.results.append(result) self._print_result(result) # 4. Elections/Candidacies print(f"\n{BLUE}4. ELECTION CANDIDACIES{RESET}") print("-" * 80) result = self.validate_count( name="Candidacy Count", pg_query="SELECT COUNT(*) as count FROM elections_candidacy", neo4j_query="MATCH (c:Candidacy) RETURN count(c) as count" ) self.results.append(result) self._print_result(result) result = self.validate_relationship_coverage( name="Candidacies with RAN_IN relationship", neo4j_node_query="MATCH (c:Candidacy) RETURN count(c) as count", neo4j_relationship_query=""" MATCH ()-[:RAN_IN]->(c:Candidacy) RETURN count(DISTINCT c) as count """, min_coverage_percent=80.0 # Lower threshold - not all candidates may be in Politician table ) self.results.append(result) self._print_result(result) # 5. Overall Relationship Integrity print(f"\n{BLUE}5. RELATIONSHIP INTEGRITY{RESET}") print("-" * 80) # Check for broken relationships (relationships pointing to non-existent nodes) relationship_checks = [ ("PART_OF relationships", "MATCH ()-[r:PART_OF]->() RETURN count(r) as count"), ("HAS_TEXT relationships", "MATCH ()-[r:HAS_TEXT]->() RETURN count(r) as count"), ("RAN_IN relationships", "MATCH ()-[r:RAN_IN]->() RETURN count(r) as count"), ("MADE_BY relationships", "MATCH ()-[r:MADE_BY]->() RETURN count(r) as count"), ] for rel_name, query in relationship_checks: result_data = self.neo4j.run_query(query) count = result_data[0]['count'] if result_data else 0 result = ValidationResult( name=rel_name, passed=True, message=f"Found {count:,} relationships", details={'count': count} ) self.results.append(result) self._print_result(result) # Final Summary self._print_summary() def _print_result(self, result: ValidationResult): """Print a single validation result.""" status = f"{GREEN}✓ PASS{RESET}" if result.passed else f"{RED}✗ FAIL{RESET}" print(f" {status} | {result.name}") print(f" {result.message}") def _print_summary(self): """Print validation summary.""" print("\n" + "=" * 80) print(f"{BLUE}VALIDATION SUMMARY{RESET}") print("=" * 80) total = len(self.results) passed = sum(1 for r in self.results if r.passed) failed = total - passed print(f"\nTotal Checks: {total}") print(f"{GREEN}Passed: {passed}{RESET}") if failed > 0: print(f"{RED}Failed: {failed}{RESET}") if failed == 0: print(f"\n{GREEN}{'=' * 80}") print("✅ ALL VALIDATIONS PASSED - DATA INTEGRITY CONFIRMED") print(f"{'=' * 80}{RESET}") else: print(f"\n{RED}{'=' * 80}") print(f"⚠️ {failed} VALIDATION(S) FAILED - REVIEW REQUIRED") print(f"{'=' * 80}{RESET}") print(f"\n{YELLOW}Failed Checks:{RESET}") for result in self.results: if not result.passed: print(f" • {result.name}: {result.message}") return failed == 0 def main(): """Run validation checks.""" # Parse PostgreSQL URI postgres_uri = os.getenv("POSTGRES_URI", "postgresql://fedmcp:fedmcp2024@localhost:5432/openparliament") parts = postgres_uri.replace("postgresql://", "").split("@") user_pass = parts[0].split(":") host_db = parts[1].split("/") host_port = host_db[0].split(":") # Initialize clients print("Connecting to databases...") postgres_client = PostgresClient( dbname=host_db[1], user=user_pass[0], password=user_pass[1], host=host_port[0], port=int(host_port[1]) if len(host_port) > 1 else 5432 ) neo4j_client = Neo4jClient( uri=os.getenv("NEO4J_URI", "bolt://localhost:7687"), user=os.getenv("NEO4J_USER", "neo4j"), password=os.getenv("NEO4J_PASSWORD", "canadagpt2024") ) print(f" {GREEN}✓{RESET} Connected to PostgreSQL") print(f" {GREEN}✓{RESET} Connected to Neo4j") # Run validations validator = DataValidator(postgres_client, neo4j_client) success = validator.run_all_validations() # Cleanup postgres_client.close() neo4j_client.close() # Exit with appropriate code sys.exit(0 if success else 1) if __name__ == "__main__": main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/northernvariables/FedMCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server