Skip to main content
Glama
northernvariables

FedMCP - Federal Parliamentary Information

analyze_conversation_threads.py15.2 kB
#!/usr/bin/env python3 """ Analyze Hansard statements and create conversation threading relationships. This script infers conversation threads from existing statement data by analyzing: - statement_type: "question", "answer", "interjection" - time: chronological proximity within topics - h2_en/h2_fr: topic grouping - document_id: debate session grouping It creates REPLIES_TO relationships in Neo4j and populates: - thread_id: Unique identifier for conversation groups - parent_statement_id: Which statement this replies to - sequence_in_thread: Order within conversation (0 = root) Usage: python scripts/analyze_conversation_threads.py [--limit N] [--document-id ID] [--dry-run] Options: --limit N: Only process first N statements (for testing) --document-id ID: Process only specific document (for testing) --dry-run: Show what would be created without making changes --time-threshold SECONDS: Max time between statements in same thread (default: 300) """ import sys import os import argparse from pathlib import Path from datetime import datetime, timedelta from typing import Dict, Any, List, Optional, Tuple from collections import defaultdict import uuid # Add packages to path SCRIPT_DIR = Path(__file__).parent PROJECT_ROOT = SCRIPT_DIR.parent sys.path.insert(0, str(PROJECT_ROOT / "packages" / "data-pipeline")) from dotenv import load_dotenv from fedmcp_pipeline.utils.neo4j_client import Neo4jClient from fedmcp_pipeline.utils.postgres_client import PostgresClient from fedmcp_pipeline.utils.config import Config class ConversationAnalyzer: """Analyzes statements and creates conversation threading.""" def __init__( self, neo4j_client: Neo4jClient, postgres_client: PostgresClient, time_threshold: int = 300, # 5 minutes in seconds dry_run: bool = False ): self.neo4j = neo4j_client self.postgres = postgres_client self.time_threshold = time_threshold self.dry_run = dry_run self.stats = { 'total_statements': 0, 'threads_created': 0, 'replies_created': 0, 'root_statements': 0, } def fetch_statements_for_analysis( self, document_id: Optional[int] = None, limit: Optional[int] = None ) -> List[Dict[str, Any]]: """ Fetch statements from PostgreSQL ordered by document, topic, and time. """ where_clauses = ["time < '4000-01-01'"] # Exclude corrupted dates params = [] if document_id: where_clauses.append("document_id = %s") params.append(document_id) where_sql = " AND " + " AND ".join(where_clauses) if where_clauses else "" limit_sql = f"LIMIT {limit}" if limit else "" query = f""" SELECT id, document_id, time, politician_id, h1_en, h2_en, h3_en, statement_type, wordcount, procedural, who_en FROM hansards_statement WHERE 1=1 {where_sql} ORDER BY document_id, h2_en, time {limit_sql} """ print(f"Fetching statements from PostgreSQL...") statements = self.postgres.execute_query(query, params=tuple(params) if params else None, dict_cursor=True) print(f"Fetched {len(statements)} statements") self.stats['total_statements'] = len(statements) return statements def group_statements_by_context( self, statements: List[Dict[str, Any]] ) -> Dict[Tuple[int, str], List[Dict[str, Any]]]: """ Group statements by (document_id, h2_en) to identify conversation contexts. """ groups = defaultdict(list) for stmt in statements: # Skip statements without document or topic if not stmt.get('document_id') or not stmt.get('h2_en'): continue key = (stmt['document_id'], stmt['h2_en']) groups[key].append(stmt) print(f"Grouped into {len(groups)} conversation contexts") return groups def analyze_group_for_threads( self, statements: List[Dict[str, Any]] ) -> List[Dict[str, Any]]: """ Analyze a group of statements and identify conversation threads. Returns list of thread structures: [ { 'thread_id': 'uuid', 'root_statement_id': 12345, 'statements': [ {'id': 12345, 'sequence': 0, 'parent_id': None}, {'id': 12346, 'sequence': 1, 'parent_id': 12345}, {'id': 12347, 'sequence': 2, 'parent_id': 12345}, ] }, ... ] """ if not statements: return [] threads = [] current_thread = None last_question_time = None last_question_id = None for stmt in statements: stmt_id = stmt['id'] stmt_type = stmt.get('statement_type', '').lower() stmt_time = stmt.get('time') # Questions always start new threads if stmt_type == 'question': # Finalize previous thread if current_thread: threads.append(current_thread) # Start new thread thread_id = str(uuid.uuid4()) current_thread = { 'thread_id': thread_id, 'root_statement_id': stmt_id, 'statements': [ {'id': stmt_id, 'sequence': 0, 'parent_id': None} ] } last_question_time = stmt_time last_question_id = stmt_id self.stats['root_statements'] += 1 # Answers and interjections may belong to current thread elif stmt_type in ('answer', 'interjection') and current_thread: # Check time proximity time_diff = None if stmt_time and last_question_time: time_diff = (stmt_time - last_question_time).total_seconds() # Add to thread if within time threshold if time_diff is not None and time_diff <= self.time_threshold: current_thread['statements'].append({ 'id': stmt_id, 'sequence': len(current_thread['statements']), 'parent_id': last_question_id # Reply to the question }) else: # Too far apart, start new thread if current_thread: threads.append(current_thread) thread_id = str(uuid.uuid4()) current_thread = { 'thread_id': thread_id, 'root_statement_id': stmt_id, 'statements': [ {'id': stmt_id, 'sequence': 0, 'parent_id': None} ] } last_question_time = stmt_time last_question_id = stmt_id self.stats['root_statements'] += 1 # Other statement types start standalone threads elif stmt_type not in ('answer', 'interjection'): # Finalize previous thread if current_thread: threads.append(current_thread) # Single-statement thread thread_id = str(uuid.uuid4()) current_thread = { 'thread_id': thread_id, 'root_statement_id': stmt_id, 'statements': [ {'id': stmt_id, 'sequence': 0, 'parent_id': None} ] } last_question_time = stmt_time last_question_id = stmt_id self.stats['root_statements'] += 1 # Finalize last thread if current_thread: threads.append(current_thread) return threads def create_threading_relationships( self, threads: List[Dict[str, Any]] ) -> int: """ Create REPLIES_TO relationships and update thread fields in Neo4j. Returns number of relationships created. """ relationships_created = 0 for thread in threads: thread_id = thread['thread_id'] statements = thread['statements'] # Update each statement with thread metadata for stmt in statements: stmt_id = str(stmt['id']) sequence = stmt['sequence'] parent_id = str(stmt['parent_id']) if stmt['parent_id'] else None # Update statement node update_query = """ MATCH (s:Statement {id: $stmt_id}) SET s.thread_id = $thread_id, s.sequence_in_thread = $sequence, s.parent_statement_id = $parent_id """ if not self.dry_run: self.neo4j.run_query(update_query, { 'stmt_id': stmt_id, 'thread_id': thread_id, 'sequence': sequence, 'parent_id': parent_id }) # Create REPLIES_TO relationship if this is a reply if parent_id: reply_query = """ MATCH (child:Statement {id: $child_id}) MATCH (parent:Statement {id: $parent_id}) MERGE (child)-[:REPLIES_TO]->(parent) """ if not self.dry_run: self.neo4j.run_query(reply_query, { 'child_id': stmt_id, 'parent_id': parent_id }) relationships_created += 1 self.stats['replies_created'] = relationships_created return relationships_created def analyze_and_thread( self, document_id: Optional[int] = None, limit: Optional[int] = None ): """ Main method to analyze statements and create threading. """ print("\n" + "=" * 80) print("HANSARD CONVERSATION THREADING ANALYSIS") print("=" * 80) print(f"Mode: {'DRY RUN' if self.dry_run else 'LIVE'}") print(f"Time threshold: {self.time_threshold} seconds") if document_id: print(f"Document filter: {document_id}") if limit: print(f"Statement limit: {limit}") print() # Step 1: Fetch statements statements = self.fetch_statements_for_analysis(document_id, limit) if not statements: print("No statements to process.") return # Step 2: Group by conversation context groups = self.group_statements_by_context(statements) # Step 3: Analyze each group for threads all_threads = [] for (doc_id, topic), group_statements in groups.items(): threads = self.analyze_group_for_threads(group_statements) all_threads.extend(threads) print(f" Document {doc_id} / {topic[:50]}... → {len(threads)} threads") self.stats['threads_created'] = len(all_threads) print(f"\nTotal threads identified: {len(all_threads)}") # Step 4: Create relationships in Neo4j if not self.dry_run: print("\nCreating threading relationships in Neo4j...") relationships_created = self.create_threading_relationships(all_threads) print(f"Created {relationships_created} REPLIES_TO relationships") else: print("\nDRY RUN - Would create threading relationships") # Calculate how many would be created total_replies = sum( len([s for s in thread['statements'] if s['parent_id']]) for thread in all_threads ) print(f"Would create {total_replies} REPLIES_TO relationships") # Print statistics print("\n" + "=" * 80) print("STATISTICS") print("=" * 80) print(f"Total statements analyzed: {self.stats['total_statements']:,}") print(f"Conversation contexts: {len(groups):,}") print(f"Threads created: {self.stats['threads_created']:,}") print(f"Root statements: {self.stats['root_statements']:,}") print(f"Reply relationships: {self.stats['replies_created']:,}") avg_thread_size = ( self.stats['total_statements'] / self.stats['threads_created'] if self.stats['threads_created'] > 0 else 0 ) print(f"Average thread size: {avg_thread_size:.1f} statements") print("=" * 80) def main(): parser = argparse.ArgumentParser( description="Analyze conversation threads in Hansard statements" ) parser.add_argument( '--limit', type=int, help='Only process first N statements (for testing)' ) parser.add_argument( '--document-id', type=int, help='Process only specific document (for testing)' ) parser.add_argument( '--time-threshold', type=int, default=300, help='Max seconds between statements in same thread (default: 300)' ) parser.add_argument( '--dry-run', action='store_true', help='Show what would be created without making changes' ) args = parser.parse_args() # Load environment load_dotenv(PROJECT_ROOT / "packages" / "data-pipeline" / ".env") # Initialize clients env_file = PROJECT_ROOT / "packages" / "data-pipeline" / ".env" config = Config(env_file=env_file) # PostgreSQL connection parameters pg_host = os.getenv("POSTGRES_HOST", "localhost") pg_port = int(os.getenv("POSTGRES_PORT", "5432")) pg_db = os.getenv("POSTGRES_DB", "openparliament") pg_user = os.getenv("POSTGRES_USER", "fedmcp") pg_password = os.getenv("POSTGRES_PASSWORD", "fedmcp2024") neo4j = Neo4jClient( config.neo4j_uri, config.neo4j_user, config.neo4j_password ) # PostgresClient signature: dbname, user, password, host, port postgres = PostgresClient( dbname=pg_db, user=pg_user, password=pg_password, host=pg_host, port=pg_port ) try: # Create analyzer analyzer = ConversationAnalyzer( neo4j, postgres, time_threshold=args.time_threshold, dry_run=args.dry_run ) # Run analysis analyzer.analyze_and_thread( document_id=args.document_id, limit=args.limit ) finally: # Cleanup neo4j.close() postgres.close() print("\n✅ Threading analysis complete!") if __name__ == "__main__": main()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/northernvariables/FedMCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server