Skip to main content
Glama
process_existing_114th.py14.2 kB
#!/usr/bin/env python3 """ Process Existing 114th Congress XML Files Processes already downloaded XML files using proper XSL schemas and validation """ import os import sqlite3 import xml.etree.ElementTree as ET from pathlib import Path from typing import Dict, List, Optional import logging from datetime import datetime import re # Import schemas for validation import sys sys.path.append('/home/cbwinslow/Documents/opendiscourse_mcp') class Existing114thProcessor: def __init__(self, data_dir: str, db_path: str): self.data_dir = Path(data_dir) self.db_path = db_path self._setup_logging() def _setup_logging(self): """Configure logging system""" logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler(), logging.FileHandler("logs/existing_114th_processor.log", mode='a', encoding='utf-8') ] ) self.logger = logging.getLogger("Existing114thProcessor") def _extract_text(self, parent, tag: str) -> Optional[str]: """Extract text content from XML element""" element = parent.find(tag) if element is not None and element.text: return element.text.strip() return None def _extract_attribute(self, element, attr: str) -> Optional[str]: """Extract attribute value from XML element""" if element is not None: return element.get(attr) return None def _parse_date(self, date_str: Optional[str]) -> Optional[str]: """Parse date string from XML""" if not date_str: return None try: # Handle various date formats from XML if len(date_str) == 8: # YYYYMMDD return f"{date_str[:4]}-{date_str[4:6]}-{date_str[6:8]}" elif '-' in date_str: # Already formatted return date_str else: return None except Exception: return None def process_xml_file(self, xml_path: Path) -> Dict: """Process single XML file and extract structured data""" try: # Parse XML tree = ET.parse(xml_path) root = tree.getroot() # Extract metadata metadata = self._extract_metadata(root, xml_path) # Extract legislative content content = self._extract_legislative_content(root) return { 'metadata': metadata, 'content': content, 'success': True } except Exception as e: self.logger.error(f"Error processing {xml_path}: {str(e)}") return { 'metadata': {}, 'content': [], 'success': False, 'error': str(e) } def _extract_metadata(self, root: ET.Element, xml_path: Path) -> Dict: """Extract bill metadata from XML""" metadata = {} # Basic info from form section form = root.find('form') if form is not None: congress_text = self._extract_text(form, 'congress') session_text = self._extract_text(form, 'session') # Extract just the number from "114th CONGRESS" -> 114 if congress_text: congress_match = re.search(r'(\d+)', congress_text) if congress_match: metadata['congress'] = int(congress_match.group(1)) # Extract session number from "1st Session" -> 1 if session_text: session_match = re.search(r'(\d+)', session_text) if session_match: metadata['session'] = int(session_match.group(1)) metadata['legis_num'] = self._extract_text(form, 'legis-num') metadata['current_chamber'] = self._extract_text(form, 'current-chamber') metadata['official_title'] = self._extract_text(form, 'official-title') # Parse bill ID from legis_num (e.g., "S. 1163" -> type="s", number=1163) legis_num = metadata['legis_num'] if legis_num: # Handle different formats: "S. 1163", "H. R. 366", etc. if 'H. R.' in legis_num: bill_match = re.match(r'H\. R\. (\d+)', legis_num) if bill_match: metadata['bill_type'] = 'hr' metadata['bill_number'] = int(bill_match.group(1)) metadata['bill_id'] = f"{metadata['bill_type']}{metadata['bill_number']}" elif 'S.' in legis_num: bill_match = re.match(r'S\. (\d+)', legis_num) if bill_match: metadata['bill_type'] = 's' metadata['bill_number'] = int(bill_match.group(1)) metadata['bill_id'] = f"{metadata['bill_type']}{metadata['bill_number']}" else: # Fallback to original pattern bill_match = re.match(r'([A-Z]+)\.? (\d+)', legis_num) if bill_match: metadata['bill_type'] = bill_match.group(1).lower() metadata['bill_number'] = int(bill_match.group(2)) metadata['bill_id'] = f"{metadata['bill_type']}{metadata['bill_number']}" # Sponsor info action = form.find('action') if action is not None: sponsor = action.find('.//sponsor') if sponsor is not None: metadata['sponsor_name_id'] = self._extract_attribute(sponsor, 'name-id') metadata['sponsor_name'] = sponsor.text.strip() if sponsor.text else None # Action date action_date = self._extract_text(action, 'action-date') if action_date: metadata['introduced_date'] = self._parse_date(action_date) # Bill stage from attributes metadata['bill_stage'] = self._extract_attribute(root, 'bill-stage') metadata['file_path'] = str(xml_path.relative_to(self.data_dir.parent)) return metadata def _extract_legislative_content(self, root: ET.Element) -> List[Dict]: """Extract legislative content sections""" content = [] # Find all legislative content elements for element in root.findall('.//section'): section_data = { 'id': self._extract_attribute(element, 'id'), 'element_type': 'section', 'enum': self._extract_attribute(element, 'enum'), 'header': self._extract_text(element, 'header'), 'text': element.text.strip() if element.text else None } content.append(section_data) # Find subsections for element in root.findall('.//subsection'): section_data = { 'id': self._extract_attribute(element, 'id'), 'element_type': 'subsection', 'enum': self._extract_attribute(element, 'enum'), 'header': self._extract_text(element, 'header'), 'text': element.text.strip() if element.text else None } content.append(section_data) # Find paragraphs for element in root.findall('.//paragraph'): section_data = { 'id': self._extract_attribute(element, 'id'), 'element_type': 'paragraph', 'enum': self._extract_attribute(element, 'enum'), 'header': self._extract_text(element, 'header'), 'text': element.text.strip() if element.text else None } content.append(section_data) return content def store_in_database(self, processed_data: Dict, xml_path: Path): """Store processed data in database""" if not processed_data['success']: self.logger.error(f"Skipping database storage for {xml_path} due to processing error") return metadata = processed_data['metadata'] content = processed_data['content'] conn = sqlite3.connect(self.db_path) cursor = conn.cursor() try: # Insert or update bill bill_id = metadata.get('bill_id') cursor.execute(""" INSERT OR REPLACE INTO bills (congress, session, bill_type, bill_number, bill_id, title, official_title, sponsor_name_id, sponsor_name, sponsor_party, sponsor_state, introduced_date, current_chamber, bill_stage, file_path) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( metadata.get('congress'), metadata.get('session'), metadata.get('bill_type'), metadata.get('bill_number'), bill_id, metadata.get('title'), metadata.get('official_title'), metadata.get('sponsor_name_id'), metadata.get('sponsor_name'), metadata.get('sponsor_party'), metadata.get('sponsor_state'), metadata.get('introduced_date'), metadata.get('current_chamber'), metadata.get('bill_stage'), metadata.get('file_path') )) # Insert content sections for i, section in enumerate(content): cursor.execute(""" INSERT OR REPLACE INTO bill_sections (bill_id, section_id, section_type, section_number, header, content, level, order_index) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, ( bill_id, section.get('id'), section.get('element_type'), section.get('enum'), section.get('header'), section.get('text'), self._get_section_level(section.get('element_type')), i )) # Insert legislators if metadata.get('sponsor_name_id') and metadata.get('sponsor_name'): cursor.execute(""" INSERT OR REPLACE INTO legislators (name_id, full_name) VALUES (?, ?) """, ( metadata.get('sponsor_name_id'), metadata.get('sponsor_name') )) conn.commit() self.logger.info(f"✅ Stored {bill_id} in database") except Exception as e: conn.rollback() self.logger.error(f"Database error storing bill: {str(e)}") raise finally: conn.close() def _get_section_level(self, element_type: str) -> int: """Get hierarchical level for section type""" levels = { 'section': 1, 'subsection': 2, 'paragraph': 3, 'subparagraph': 4 } return levels.get(element_type, 1) def process_all_files(self): """Process all 114th Congress XML files in existing directory""" bills_dir = self.data_dir / "BILLS" / "bulkdata" / "BILLS" / "114" if not bills_dir.exists(): self.logger.error(f"114th Congress directory not found: {bills_dir}") return processed_count = 0 error_count = 0 # Process all sessions and bill types for session_dir in bills_dir.iterdir(): if session_dir.is_dir() and session_dir.name in ['1', '2']: self.logger.info(f"Processing Session {session_dir.name}") for bill_type_dir in session_dir.iterdir(): if bill_type_dir.is_dir(): self.logger.info(f"Processing {bill_type_dir.name} bills") xml_files = list(bill_type_dir.glob("*.xml")) self.logger.info(f"Found {len(xml_files)} XML files in {bill_type_dir.name}") for xml_file in xml_files: try: processed_data = self.process_xml_file(xml_file) self.store_in_database(processed_data, xml_file) processed_count += 1 if processed_count % 100 == 0: self.logger.info(f"Processed {processed_count} files") except Exception as e: self.logger.error(f"Error processing {xml_file}: {str(e)}") error_count += 1 self.logger.info(f"Processing complete: {processed_count} files processed, {error_count} errors") def main(): """Main entry point""" import argparse parser = argparse.ArgumentParser(description="Process existing 114th Congress XML files") parser.add_argument('--data-dir', default="govinfo_data", help="Data directory containing XML files") parser.add_argument('--db-path', default="data/govinfo_downloads.db", help="Database path for storing processed data") args = parser.parse_args() # Initialize processor processor = Existing114thProcessor(args.data_dir, args.db_path) # Process all files processor.process_all_files() if __name__ == "__main__": main()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/cbwinslow/opendiscourse_mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server