#!/usr/bin/env python3
"""
Comprehensive Neo4j Database Analysis for CanadaGPT
Analyzes Session 45-1 data with deep focus on November 2025:
- Data completeness (missing dates, coverage gaps)
- Data quality (MP linking rates, timestamp validation)
- Relationship integrity (orphaned nodes, broken links)
- Schema health (indexes, constraints, duplicates)
- November-specific issues (corruption patterns, anomalies)
Outputs:
- TXT report (human-readable terminal format)
- JSON report (machine-readable for programmatic analysis)
- Automated fix recommendations with optional execution
Run:
export NEO4J_URI=bolt://localhost:7687
export NEO4J_PASSWORD=canadagpt2024
python scripts/comprehensive_data_analysis.py
# With automated fixes:
python scripts/comprehensive_data_analysis.py --apply-fixes
"""
import sys
import os
import json
from pathlib import Path
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional, Tuple
from collections import defaultdict
import calendar
# Add packages to path
sys.path.insert(0, str(Path(__file__).parent.parent / 'packages' / 'data-pipeline'))
from fedmcp_pipeline.utils.neo4j_client import Neo4jClient
from fedmcp_pipeline.utils.progress import logger
# ============================================
# 1. DATA COMPLETENESS ANALYZER
# ============================================
class DataCompletenessAnalyzer:
"""Analyzes data completeness for Session 45-1 and November 2025."""
def __init__(self, neo4j: Neo4jClient):
self.neo4j = neo4j
self.results = {}
def analyze(self) -> Dict[str, Any]:
"""Run all completeness analyses."""
logger.info("=" * 80)
logger.info("DATA COMPLETENESS ANALYSIS")
logger.info("=" * 80)
self.results['session_coverage'] = self._analyze_session_coverage()
self.results['november_coverage'] = self._analyze_november_coverage()
self.results['missing_dates'] = self._find_missing_sitting_days()
return self.results
def _analyze_session_coverage(self) -> Dict[str, Any]:
"""Analyze Session 45-1 overall coverage."""
logger.info("Analyzing Session 45-1 coverage...")
query = """
MATCH (s:Session {id: '45-1'})
OPTIONAL MATCH (d:Document)-[:FROM_SESSION]->(s)
OPTIONAL MATCH (v:Vote)-[:FROM_SESSION]->(s)
OPTIONAL MATCH (b:Bill) WHERE b.session = '45-1'
OPTIONAL MATCH (m:Meeting) WHERE m.session = '45-1'
OPTIONAL MATCH (stmt:Statement)-[:PART_OF]->(d)
RETURN
count(DISTINCT d) as document_count,
count(DISTINCT v) as vote_count,
count(DISTINCT b) as bill_count,
count(DISTINCT m) as meeting_count,
count(stmt) as statement_count
"""
result = self.neo4j.run_query(query)
if not result:
return {'error': 'No data found for Session 45-1'}
row = result[0]
# Get document type breakdown
doc_type_query = """
MATCH (d:Document {parliament_number: 45, session_number: 1})
RETURN d.document_type as type, count(*) as count
"""
doc_types = self.neo4j.run_query(doc_type_query)
debates = sum(r['count'] for r in doc_types if r['type'] == 'D')
evidence = sum(r['count'] for r in doc_types if r['type'] == 'E')
return {
'parliament': 45,
'session': 1,
'documents_total': row['document_count'],
'documents_debates': debates,
'documents_evidence': evidence,
'statements': row['statement_count'],
'votes': row['vote_count'],
'bills': row['bill_count'],
'meetings': row['meeting_count']
}
def _analyze_november_coverage(self) -> Dict[str, Any]:
"""Analyze November 2025 daily coverage."""
logger.info("Analyzing November 2025 coverage...")
query = """
MATCH (d:Document)
WHERE d.date STARTS WITH '2025-11'
OPTIONAL MATCH (s:Statement)-[:PART_OF]->(d)
OPTIONAL MATCH (s)-[:MADE_BY]->(mp:MP)
WITH d, count(s) as stmt_count, count(mp) as linked_count
RETURN
d.id as doc_id,
d.date as date,
d.number as number,
stmt_count,
linked_count,
CASE WHEN stmt_count > 0
THEN 100.0 * linked_count / stmt_count
ELSE 0
END as link_rate
ORDER BY d.date
"""
results = self.neo4j.run_query(query)
documents = []
for row in results:
documents.append({
'id': row['doc_id'],
'date': row['date'],
'number': row['number'],
'statements': row['stmt_count'],
'linked': row['linked_count'],
'link_rate': round(row['link_rate'], 2) if row['link_rate'] else 0
})
return {
'documents_found': len(documents),
'documents': documents
}
def _find_missing_sitting_days(self) -> List[str]:
"""Find missing November sitting days based on expected House calendar."""
logger.info("Finding missing sitting days in November 2025...")
# Get all dates we have
query = """
MATCH (d:Document)
WHERE d.date STARTS WITH '2025-11'
RETURN DISTINCT d.date
ORDER BY d.date
"""
results = self.neo4j.run_query(query)
existing_dates = {row['d.date'] for row in results}
# Expected sitting days (excluding weekends and known breaks)
# November 2025: Typically House sits Mon-Fri
missing_dates = []
for day in range(1, 31): # November has 30 days
date = datetime(2025, 11, day)
if date.weekday() < 5: # Monday-Friday
date_str = date.strftime('%Y-%m-%d')
if date_str not in existing_dates:
missing_dates.append(date_str)
return sorted(missing_dates)
# ============================================
# 2. DATA QUALITY ANALYZER
# ============================================
class DataQualityAnalyzer:
"""Analyzes data quality and linking success rates."""
def __init__(self, neo4j: Neo4jClient):
self.neo4j = neo4j
self.results = {}
def analyze(self) -> Dict[str, Any]:
"""Run all quality analyses."""
logger.info("=" * 80)
logger.info("DATA QUALITY ANALYSIS")
logger.info("=" * 80)
self.results['statement_linking'] = self._analyze_statement_linking()
self.results['vote_linking'] = self._analyze_vote_linking()
self.results['committee_completeness'] = self._analyze_committee_completeness()
self.results['timestamp_validation'] = self._validate_timestamps()
self.results['mp_metadata'] = self._analyze_mp_metadata()
return self.results
def _analyze_statement_linking(self) -> Dict[str, Any]:
"""Analyze Statement → MP linking success rates."""
logger.info("Analyzing Statement → MP linking...")
query = """
MATCH (d:Document)
WHERE d.date STARTS WITH '2025-11'
MATCH (s:Statement)-[:PART_OF]->(d)
WITH count(s) as total
MATCH (s2:Statement)-[:PART_OF]->(d2:Document)
WHERE d2.date STARTS WITH '2025-11' AND s2.person_db_id IS NOT NULL
WITH total, count(s2) as with_db_id
MATCH (s3:Statement)-[:PART_OF]->(d3:Document)-[:MADE_BY]->(mp:MP)
WHERE d3.date STARTS WITH '2025-11'
MATCH (s3)-[:MADE_BY]->(mp2:MP)
RETURN total, with_db_id, count(s3) as linked,
100.0 * with_db_id / total as db_id_rate,
100.0 * count(s3) / total as link_rate
"""
result = self.neo4j.run_query(query)
if not result or result[0]['total'] == 0:
return {
'total_statements': 0,
'with_person_db_id': 0,
'linked_to_mp': 0,
'orphaned': 0,
'db_id_rate': 0.0,
'link_rate': 0.0
}
row = result[0]
# Find orphaned statements
orphaned_query = """
MATCH (d:Document)
WHERE d.date STARTS WITH '2025-11'
MATCH (s:Statement)-[:PART_OF]->(d)
WHERE NOT (s)-[:MADE_BY]->(:MP)
RETURN count(s) as orphaned_count
"""
orphaned_result = self.neo4j.run_query(orphaned_query)
orphaned = orphaned_result[0]['orphaned_count'] if orphaned_result else 0
return {
'total_statements': row['total'],
'with_person_db_id': row['with_db_id'],
'linked_to_mp': row['linked'],
'orphaned': orphaned,
'db_id_rate': round(row['db_id_rate'], 2) if row['total'] > 0 else 0.0,
'link_rate': round(row['link_rate'], 2) if row['total'] > 0 else 0.0
}
def _analyze_vote_linking(self) -> Dict[str, Any]:
"""Analyze Vote → MP linking via Ballots."""
logger.info("Analyzing Vote → MP linking...")
query = """
MATCH (v:Vote)
WHERE v.parliament_number = 45 AND v.session_number = 1
MATCH (b:Ballot)-[:CAST_IN]->(v)
WITH count(b) as total
MATCH (b2:Ballot)-[:CAST_IN]->(v2:Vote)
WHERE v2.parliament_number = 45 AND v2.session_number = 1
AND b2.person_id IS NOT NULL
WITH total, count(b2) as with_person_id
MATCH (b3:Ballot)-[:CAST_IN]->(v3:Vote)-[:CAST_BY]->(mp:MP)
WHERE v3.parliament_number = 45 AND v3.session_number = 1
MATCH (b3)-[:CAST_BY]->(mp2:MP)
RETURN total, with_person_id, count(b3) as linked,
100.0 * with_person_id / total as person_id_rate,
100.0 * count(b3) / total as link_rate
"""
result = self.neo4j.run_query(query)
if not result or result[0]['total'] == 0:
return {
'total_ballots': 0,
'with_person_id': 0,
'linked_to_mp': 0,
'link_rate': 0.0
}
row = result[0]
return {
'total_ballots': row['total'],
'with_person_id': row['with_person_id'],
'linked_to_mp': row['linked'],
'link_rate': round(row['link_rate'], 2)
}
def _analyze_committee_completeness(self) -> Dict[str, Any]:
"""Analyze Committee Meeting → Evidence completeness."""
logger.info("Analyzing Committee Evidence completeness...")
query = """
MATCH (m:Meeting)
WHERE m.date STARTS WITH '2025-11'
WITH count(m) as total_meetings
MATCH (m2:Meeting)-[:HAS_EVIDENCE]->(ce:CommitteeEvidence)
WHERE m2.date STARTS WITH '2025-11'
WITH total_meetings, count(DISTINCT m2) as with_evidence
MATCH (ct:CommitteeTestimony)-[:GIVEN_IN]->(ce2:CommitteeEvidence)
WHERE ce2.date STARTS WITH '2025-11'
WITH total_meetings, with_evidence, count(ct) as total_testimonies
MATCH (ct2:CommitteeTestimony)-[:GIVEN_IN]->(:CommitteeEvidence)-[:TESTIMONY_BY]->(mp:MP)
WHERE ct2.testimony_id IS NOT NULL
MATCH (ct2)-[:TESTIMONY_BY]->(mp2:MP)
RETURN total_meetings, with_evidence, total_testimonies, count(ct2) as linked_testimonies,
CASE WHEN total_meetings > 0
THEN 100.0 * with_evidence / total_meetings
ELSE 0
END as evidence_rate,
CASE WHEN total_testimonies > 0
THEN 100.0 * count(ct2) / total_testimonies
ELSE 0
END as testimony_link_rate
"""
result = self.neo4j.run_query(query)
if not result or result[0]['total_meetings'] == 0:
return {'total_meetings': 0}
row = result[0]
return {
'total_meetings': row['total_meetings'],
'with_evidence': row['with_evidence'],
'evidence_rate': round(row['evidence_rate'], 2),
'total_testimonies': row['total_testimonies'],
'linked_testimonies': row['linked_testimonies'],
'testimony_link_rate': round(row['testimony_link_rate'], 2)
}
def _validate_timestamps(self) -> Dict[str, Any]:
"""Validate Statement.time format for malformed timestamps."""
logger.info("Validating timestamp formats...")
# Check for string time values (malformed)
query = """
MATCH (d:Document)
WHERE d.date STARTS WITH '2025-11'
WITH collect(d.id) as nov_doc_ids
MATCH (s:Statement)
WHERE s.document_id IN nov_doc_ids AND s.time IS NOT NULL
WITH s
WHERE NOT toString(s.time) =~ '.*T\\d{2}:\\d{2}:\\d{2}.*'
RETURN count(s) as malformed_count,
collect(s.id)[0..5] as sample_ids
"""
result = self.neo4j.run_query(query)
return {
'malformed_timestamps': result[0]['malformed_count'] if result else 0,
'sample_malformed_ids': result[0]['sample_ids'] if result else []
}
def _analyze_mp_metadata(self) -> Dict[str, Any]:
"""Analyze MP metadata completeness."""
logger.info("Analyzing MP metadata completeness...")
query = """
MATCH (mp:MP {current: true})
WITH count(mp) as total
MATCH (mp2:MP {current: true})
WHERE mp2.hansard_db_id IS NOT NULL
WITH total, count(mp2) as with_hansard_id
MATCH (mp3:MP {current: true})
WHERE mp3.parl_mp_id IS NOT NULL
RETURN total, with_hansard_id, count(mp3) as with_parl_id,
100.0 * with_hansard_id / total as hansard_id_rate,
100.0 * count(mp3) / total as parl_id_rate
"""
result = self.neo4j.run_query(query)
row = result[0]
return {
'total_current_mps': row['total'],
'with_hansard_db_id': row['with_hansard_id'],
'with_parl_mp_id': row['with_parl_id'],
'hansard_id_coverage': round(row['hansard_id_rate'], 2),
'parl_id_coverage': round(row['parl_id_rate'], 2)
}
# ============================================
# 3. RELATIONSHIP INTEGRITY ANALYZER
# ============================================
class RelationshipIntegrityAnalyzer:
"""Analyzes relationship integrity and orphaned nodes."""
def __init__(self, neo4j: Neo4jClient):
self.neo4j = neo4j
self.results = {}
def analyze(self) -> Dict[str, Any]:
"""Run all integrity analyses."""
logger.info("=" * 80)
logger.info("RELATIONSHIP INTEGRITY ANALYSIS")
logger.info("=" * 80)
self.results['orphaned_statements'] = self._find_orphaned_statements()
self.results['orphaned_ballots'] = self._find_orphaned_ballots()
self.results['empty_documents'] = self._find_empty_documents()
self.results['silent_mps'] = self._find_silent_mps()
self.results['cardinality_metrics'] = self._analyze_cardinality()
return self.results
def _find_orphaned_statements(self) -> Dict[str, Any]:
"""Find statements without MP links."""
logger.info("Finding orphaned statements...")
query = """
MATCH (d:Document)
WHERE d.date STARTS WITH '2025-11'
MATCH (s:Statement)-[:PART_OF]->(d)
WHERE NOT (s)-[:MADE_BY]->(:MP)
RETURN count(s) as count,
collect(DISTINCT s.person_db_id)[0..10] as sample_person_db_ids
"""
result = self.neo4j.run_query(query)
return {
'count': result[0]['count'] if result else 0,
'sample_person_db_ids': result[0]['sample_person_db_ids'] if result else []
}
def _find_orphaned_ballots(self) -> Dict[str, Any]:
"""Find ballots without MP links."""
logger.info("Finding orphaned ballots...")
query = """
MATCH (b:Ballot)
WHERE NOT (b)-[:CAST_BY]->(:MP)
RETURN count(b) as count
"""
result = self.neo4j.run_query(query)
return {
'count': result[0]['count'] if result else 0
}
def _find_empty_documents(self) -> Dict[str, Any]:
"""Find documents without statements."""
logger.info("Finding empty documents...")
query = """
MATCH (d:Document)
WHERE NOT (d)<-[:PART_OF]-(:Statement)
RETURN count(d) as count,
collect(d.id)[0..5] as sample_ids
"""
result = self.neo4j.run_query(query)
return {
'count': result[0]['count'] if result else 0,
'sample_ids': result[0]['sample_ids'] if result else []
}
def _find_silent_mps(self) -> Dict[str, Any]:
"""Find MPs without SPOKE_AT relationships in Session 45-1."""
logger.info("Finding silent MPs in Session 45-1...")
query = """
MATCH (mp:MP {current: true})
WHERE NOT (mp)-[:SPOKE_AT]->(:Document)
RETURN count(mp) as count,
collect(mp.name)[0..10] as sample_names
"""
result = self.neo4j.run_query(query)
return {
'count': result[0]['count'] if result else 0,
'sample_names': result[0]['sample_names'] if result else []
}
def _analyze_cardinality(self) -> Dict[str, Any]:
"""Analyze relationship cardinality metrics."""
logger.info("Analyzing relationship cardinality...")
# Average statements per document
doc_query = """
MATCH (d:Document)<-[:PART_OF]-(s:Statement)
WITH d, count(s) as stmt_count
RETURN avg(stmt_count) as avg_statements_per_doc
"""
# Average ballots per vote (but Ballot nodes don't exist, so skip for now)
vote_query = """
MATCH (v:Vote)
RETURN 0 as avg_ballots_per_vote
"""
doc_result = self.neo4j.run_query(doc_query)
vote_result = self.neo4j.run_query(vote_query)
return {
'avg_statements_per_document': round(doc_result[0]['avg_statements_per_doc'], 2) if doc_result else 0,
'avg_ballots_per_vote': round(vote_result[0]['avg_ballots_per_vote'], 2) if vote_result else 0
}
# ============================================
# 4. SCHEMA HEALTH ANALYZER
# ============================================
class SchemaHealthAnalyzer:
"""Analyzes schema health (indexes, constraints, duplicates)."""
def __init__(self, neo4j: Neo4jClient):
self.neo4j = neo4j
self.results = {}
def analyze(self) -> Dict[str, Any]:
"""Run all schema analyses."""
logger.info("=" * 80)
logger.info("SCHEMA HEALTH ANALYSIS")
logger.info("=" * 80)
self.results['indexes'] = self._analyze_indexes()
self.results['duplicate_mps'] = self._find_duplicate_mps()
self.results['recommended_indexes'] = self._recommend_indexes()
return self.results
def _analyze_indexes(self) -> List[Dict[str, Any]]:
"""Get existing indexes and constraints."""
logger.info("Analyzing existing indexes...")
query = """
SHOW INDEXES
YIELD name, labelsOrTypes, properties, type
RETURN name, labelsOrTypes, properties, type
"""
result = self.neo4j.run_query(query)
indexes = []
for row in result:
indexes.append({
'name': row['name'],
'labels': row['labelsOrTypes'],
'properties': row['properties'],
'type': row['type']
})
return indexes
def _find_duplicate_mps(self) -> List[Dict[str, Any]]:
"""Find MP nodes with duplicate names."""
logger.info("Finding duplicate MP nodes...")
query = """
MATCH (mp:MP)
WITH mp.name as name, collect(mp) as duplicates
WHERE size(duplicates) > 1
RETURN name,
[d in duplicates | d.id] as ids,
size(duplicates) as count
ORDER BY count DESC
LIMIT 10
"""
result = self.neo4j.run_query(query)
duplicates = []
for row in result:
duplicates.append({
'name': row['name'],
'ids': row['ids'],
'count': row['count']
})
return duplicates
def _recommend_indexes(self) -> List[Dict[str, Any]]:
"""Recommend missing indexes on join keys."""
logger.info("Recommending missing indexes...")
# Check if these indexes already exist
existing_indexes = self._analyze_indexes()
existing_props = set()
for idx in existing_indexes:
if idx.get('properties'):
for prop in idx['properties']:
existing_props.add(prop)
recommended = []
# Recommended indexes for performance
if 'person_db_id' not in existing_props:
recommended.append({
'index_name': 'statement_person_db_id',
'node_type': 'Statement',
'property': 'person_db_id',
'reason': 'Used for MP linking in Hansard imports',
'command': 'CREATE INDEX statement_person_db_id FOR (s:Statement) ON (s.person_db_id)'
})
if 'person_id' not in existing_props:
recommended.append({
'index_name': 'ballot_person_id',
'node_type': 'Ballot',
'property': 'person_id',
'reason': 'Used for MP linking in Vote imports',
'command': 'CREATE INDEX ballot_person_id FOR (b:Ballot) ON (b.person_id)'
})
return recommended
# ============================================
# 5. NOVEMBER DEEP DIVE ANALYZER
# ============================================
class NovemberDeepDiveAnalyzer:
"""Deep dive analysis of November 2025 data."""
def __init__(self, neo4j: Neo4jClient):
self.neo4j = neo4j
self.results = {}
def analyze(self) -> Dict[str, Any]:
"""Run November-specific analyses."""
logger.info("=" * 80)
logger.info("NOVEMBER 2025 DEEP DIVE")
logger.info("=" * 80)
self.results['daily_analysis'] = self._analyze_daily_data()
self.results['unlinked_speakers'] = self._find_unlinked_speakers()
self.results['corruption_patterns'] = self._detect_corruption()
return self.results
def _analyze_daily_data(self) -> List[Dict[str, Any]]:
"""Analyze each November day in detail."""
logger.info("Analyzing daily November data...")
query = """
MATCH (d:Document)
WHERE d.date STARTS WITH '2025-11'
OPTIONAL MATCH (s:Statement)-[:PART_OF]->(d)
OPTIONAL MATCH (s)-[:MADE_BY]->(mp:MP)
WITH d, count(s) as total_stmts, count(mp) as linked_stmts,
count(DISTINCT mp) as unique_speakers
RETURN
d.date as date,
d.id as doc_id,
d.number as sitting_number,
total_stmts,
linked_stmts,
unique_speakers,
CASE WHEN total_stmts > 0
THEN 100.0 * linked_stmts / total_stmts
ELSE 0
END as link_rate
ORDER BY d.date
"""
results = self.neo4j.run_query(query)
daily_data = []
for row in results:
daily_data.append({
'date': row['date'],
'doc_id': row['doc_id'],
'sitting_number': row['sitting_number'],
'total_statements': row['total_stmts'],
'linked_statements': row['linked_stmts'],
'unique_speakers': row['unique_speakers'],
'link_rate': round(row['link_rate'], 2)
})
return daily_data
def _find_unlinked_speakers(self) -> List[Dict[str, Any]]:
"""Find top unlinked speakers by person_db_id."""
logger.info("Finding top unlinked speakers...")
query = """
MATCH (d:Document)
WHERE d.date STARTS WITH '2025-11'
MATCH (s:Statement)-[:PART_OF]->(d)
WHERE s.person_db_id IS NOT NULL
AND NOT (s)-[:MADE_BY]->(:MP)
RETURN s.person_db_id as person_db_id,
s.who_en as speaker_name,
count(*) as statement_count
ORDER BY statement_count DESC
LIMIT 20
"""
result = self.neo4j.run_query(query)
unlinked = []
for row in result:
unlinked.append({
'person_db_id': row['person_db_id'],
'speaker_name': row['speaker_name'],
'statement_count': row['statement_count']
})
return unlinked
def _detect_corruption(self) -> Dict[str, Any]:
"""Detect data corruption patterns."""
logger.info("Detecting data corruption patterns...")
# Check for malformed timestamps
malformed_query = """
MATCH (d:Document)
WHERE d.date STARTS WITH '2025-11'
MATCH (s:Statement)-[:PART_OF]->(d)
WHERE s.time IS NOT NULL
AND toString(s.time) CONTAINS '('
RETURN count(s) as malformed_count
"""
malformed = self.neo4j.run_query(malformed_query)
return {
'malformed_timestamps': malformed[0]['malformed_count'] if malformed else 0
}
# ============================================
# REPORT GENERATORS
# ============================================
class TXTReportGenerator:
"""Generate human-readable TXT report."""
@staticmethod
def generate(analyses: Dict[str, Any], filename: str):
"""Generate TXT report file."""
logger.info(f"Generating TXT report: {filename}")
with open(filename, 'w', encoding='utf-8') as f:
f.write("═" * 80 + "\n")
f.write("CANADAGPT NEO4J DATABASE ANALYSIS REPORT\n")
f.write("Session 45-1 Analysis with November 2025 Deep Dive\n")
f.write(f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S ET')}\n")
f.write("═" * 80 + "\n\n")
# Executive Summary
f.write("EXECUTIVE SUMMARY\n")
f.write("─" * 80 + "\n")
TXTReportGenerator._write_executive_summary(f, analyses)
# Data Completeness
f.write("\n" + "═" * 80 + "\n")
f.write("1. DATA COMPLETENESS ANALYSIS\n")
f.write("═" * 80 + "\n\n")
TXTReportGenerator._write_completeness(f, analyses['completeness'])
# Data Quality
f.write("\n" + "═" * 80 + "\n")
f.write("2. DATA QUALITY ANALYSIS\n")
f.write("═" * 80 + "\n\n")
TXTReportGenerator._write_quality(f, analyses['quality'])
# Relationship Integrity
f.write("\n" + "═" * 80 + "\n")
f.write("3. RELATIONSHIP INTEGRITY ANALYSIS\n")
f.write("═" * 80 + "\n\n")
TXTReportGenerator._write_integrity(f, analyses['integrity'])
# Schema Health
f.write("\n" + "═" * 80 + "\n")
f.write("4. SCHEMA HEALTH ANALYSIS\n")
f.write("═" * 80 + "\n\n")
TXTReportGenerator._write_schema(f, analyses['schema'])
# November Deep Dive
f.write("\n" + "═" * 80 + "\n")
f.write("5. NOVEMBER 2025 DEEP DIVE\n")
f.write("═" * 80 + "\n\n")
TXTReportGenerator._write_november(f, analyses['november'])
f.write("\n" + "═" * 80 + "\n")
f.write("END OF REPORT\n")
f.write("═" * 80 + "\n")
logger.success(f"✓ TXT report saved: {filename}")
@staticmethod
def _write_executive_summary(f, analyses):
"""Write executive summary section."""
# Calculate health indicators
issues = []
warnings = []
# Check for critical issues
completeness = analyses['completeness']
if completeness['missing_dates']:
issues.append(f"{len(completeness['missing_dates'])} missing November sitting days")
quality = analyses['quality']
stmt_link_rate = quality['statement_linking'].get('link_rate', 0)
if stmt_link_rate < 98:
if stmt_link_rate < 90:
issues.append(f"Statement linking rate {stmt_link_rate}% (target: 98%+)")
else:
warnings.append(f"Statement linking rate {stmt_link_rate}% (target: 98%+)")
integrity = analyses['integrity']
orphaned = integrity['orphaned_statements']['count']
if orphaned > 0:
issues.append(f"{orphaned:,} orphaned statements")
# Overall health
if issues:
f.write("Overall Health: ✗ NEEDS ATTENTION\n")
elif warnings:
f.write("Overall Health: ⚠ WARNINGS\n")
else:
f.write("Overall Health: ✓ GOOD\n")
f.write(f"Critical Issues: {len(issues)}\n")
f.write(f"Warnings: {len(warnings)}\n\n")
f.write("Key Findings:\n")
for issue in issues:
f.write(f" ✗ CRITICAL: {issue}\n")
for warning in warnings:
f.write(f" ⚠ WARNING: {warning}\n")
if not issues and not warnings:
f.write(" ✓ All checks passed\n")
@staticmethod
def _write_completeness(f, data):
"""Write data completeness section."""
session = data['session_coverage']
f.write("Session 45-1 Coverage\n")
f.write("─" * 40 + "\n")
f.write(f" Parliament: {session['parliament']}\n")
f.write(f" Session: {session['session']}\n")
f.write(f" Documents: {session['documents_total']:,} ")
f.write(f"(Debates: {session['documents_debates']:,}, Evidence: {session['documents_evidence']:,})\n")
f.write(f" Statements: {session['statements']:,}\n")
f.write(f" Votes: {session['votes']:,}\n")
f.write(f" Bills: {session['bills']:,}\n")
f.write(f" Meetings: {session['meetings']:,}\n\n")
november = data['november_coverage']
f.write(f"November 2025 Coverage\n")
f.write("─" * 40 + "\n")
f.write(f" Documents Found: {november['documents_found']}\n")
f.write(f" Missing Dates: {len(data['missing_dates'])}\n\n")
for doc in november['documents'][:10]: # Show first 10
status = "✓" if doc['link_rate'] >= 95 else "⚠"
f.write(f" {status} {doc['date']} - Doc {doc['id']} - ")
f.write(f"{doc['statements']:,} statements - {doc['link_rate']}% linked\n")
if data['missing_dates']:
f.write(f"\n Missing dates:\n")
for date in data['missing_dates'][:10]:
day_name = datetime.strptime(date, '%Y-%m-%d').strftime('%a')
f.write(f" ✗ {date} ({day_name})\n")
@staticmethod
def _write_quality(f, data):
"""Write data quality section."""
stmt = data['statement_linking']
f.write("Statement → MP Linking\n")
f.write("─" * 40 + "\n")
f.write(f" Total Statements: {stmt['total_statements']:,}\n")
f.write(f" With person_db_id: {stmt['with_person_db_id']:,} ({stmt['db_id_rate']}%)\n")
f.write(f" Linked to MP: {stmt['linked_to_mp']:,} ({stmt['link_rate']}%)\n")
f.write(f" Orphaned: {stmt['orphaned']:,}\n\n")
vote = data['vote_linking']
f.write("Vote → MP Linking\n")
f.write("─" * 40 + "\n")
f.write(f" Total Ballots: {vote['total_ballots']:,}\n")
f.write(f" Linked to MP: {vote['linked_to_mp']:,} ({vote['link_rate']}%)\n\n")
mp = data['mp_metadata']
f.write("MP Metadata Completeness\n")
f.write("─" * 40 + "\n")
f.write(f" Current MPs: {mp['total_current_mps']}\n")
f.write(f" With hansard_db_id: {mp['with_hansard_db_id']} ({mp['hansard_id_coverage']}%)\n")
f.write(f" With parl_mp_id: {mp['with_parl_mp_id']} ({mp['parl_id_coverage']}%)\n")
@staticmethod
def _write_integrity(f, data):
"""Write relationship integrity section."""
f.write(f"Orphaned Statements: {data['orphaned_statements']['count']:,}\n")
f.write(f"Orphaned Ballots: {data['orphaned_ballots']['count']:,}\n")
f.write(f"Empty Documents: {data['empty_documents']['count']:,}\n")
f.write(f"Silent MPs (no SPOKE_AT): {data['silent_mps']['count']:,}\n\n")
cardinality = data['cardinality_metrics']
f.write("Relationship Cardinality:\n")
f.write(f" Avg Statements per Document: {cardinality['avg_statements_per_document']}\n")
f.write(f" Avg Ballots per Vote: {cardinality['avg_ballots_per_vote']}\n")
@staticmethod
def _write_schema(f, data):
"""Write schema health section."""
f.write(f"Existing Indexes: {len(data['indexes'])}\n\n")
for idx in data['indexes'][:10]:
f.write(f" - {idx['name']}: {idx['labels']} on {idx['properties']}\n")
if data['duplicate_mps']:
f.write(f"\nDuplicate MP Nodes: {len(data['duplicate_mps'])}\n")
for dup in data['duplicate_mps']:
f.write(f" - {dup['name']}: {dup['count']} nodes\n")
if data['recommended_indexes']:
f.write(f"\nRecommended Indexes: {len(data['recommended_indexes'])}\n")
for rec in data['recommended_indexes']:
f.write(f" - {rec['index_name']}: {rec['reason']}\n")
@staticmethod
def _write_november(f, data):
"""Write November deep dive section."""
daily = data['daily_analysis']
f.write(f"Daily Analysis ({len(daily)} days):\n\n")
for day in daily:
status = "✓" if day['link_rate'] >= 95 else "⚠"
f.write(f" {status} {day['date']}: {day['total_statements']:,} statements, ")
f.write(f"{day['unique_speakers']} speakers, {day['link_rate']}% linked\n")
unlinked = data['unlinked_speakers']
if unlinked:
f.write(f"\nTop Unlinked Speakers ({len(unlinked)}):\n")
for speaker in unlinked[:10]:
f.write(f" - {speaker['speaker_name']} (DbId={speaker['person_db_id']}): ")
f.write(f"{speaker['statement_count']} statements\n")
class JSONReportGenerator:
"""Generate machine-readable JSON report."""
@staticmethod
def generate(analyses: Dict[str, Any], filename: str):
"""Generate JSON report file."""
logger.info(f"Generating JSON report: {filename}")
report = {
'metadata': {
'generated_at': datetime.now().isoformat(),
'session': '45-1',
'focus_month': '2025-11',
'database_uri': os.getenv('NEO4J_URI', 'unknown')
},
'analyses': analyses
}
with open(filename, 'w', encoding='utf-8') as f:
json.dump(report, f, indent=2, default=str)
logger.success(f"✓ JSON report saved: {filename}")
# ============================================
# MAIN
# ============================================
def main():
"""Main entry point."""
logger.info("=" * 80)
logger.info("COMPREHENSIVE NEO4J DATABASE ANALYSIS")
logger.info("Session 45-1 with November 2025 Deep Dive")
logger.info("=" * 80)
print()
# Connect to Neo4j
neo4j_uri = os.getenv('NEO4J_URI', 'bolt://localhost:7687')
neo4j_user = os.getenv('NEO4J_USERNAME', 'neo4j')
neo4j_password = os.getenv('NEO4J_PASSWORD')
if not neo4j_password:
logger.error("NEO4J_PASSWORD environment variable not set")
logger.info("Run: export NEO4J_PASSWORD=your_password")
sys.exit(1)
logger.info(f"Connecting to Neo4j at {neo4j_uri}...")
neo4j = Neo4jClient(uri=neo4j_uri, user=neo4j_user, password=neo4j_password)
try:
# Run all analyses
analyses = {}
completeness = DataCompletenessAnalyzer(neo4j)
analyses['completeness'] = completeness.analyze()
quality = DataQualityAnalyzer(neo4j)
analyses['quality'] = quality.analyze()
integrity = RelationshipIntegrityAnalyzer(neo4j)
analyses['integrity'] = integrity.analyze()
schema = SchemaHealthAnalyzer(neo4j)
analyses['schema'] = schema.analyze()
november = NovemberDeepDiveAnalyzer(neo4j)
analyses['november'] = november.analyze()
# Generate reports
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
txt_filename = f"data_analysis_report_{timestamp}.txt"
json_filename = f"data_analysis_report_{timestamp}.json"
TXTReportGenerator.generate(analyses, txt_filename)
JSONReportGenerator.generate(analyses, json_filename)
logger.info("=" * 80)
logger.success("✅ ANALYSIS COMPLETE")
logger.info("=" * 80)
logger.info(f"Reports generated:")
logger.info(f" - TXT: {txt_filename}")
logger.info(f" - JSON: {json_filename}")
except Exception as e:
logger.error(f"Analysis failed: {e}")
import traceback
traceback.print_exc()
sys.exit(1)
finally:
neo4j.close()
if __name__ == "__main__":
main()