Skip to main content
Glama
ingestor.py23.3 kB
"""Main ingestion logic for govinfo.gov bulk data.""" import asyncio import json import logging import time from datetime import datetime from pathlib import Path import aiofiles import aiohttp from tqdm import tqdm from .config import ( BASE_URL, CHUNK_SIZE, CONGRESS_SESSIONS, DOCUMENT_TYPES, LOG_LEVEL, MAX_RETRIES, OUTPUT_DIR, RATE_LIMIT, REQUEST_TIMEOUT, RETRY_DELAY, VALIDATE_XML, WORKERS, ) from .rate_limiter import RateLimiter from .xml_validator import XMLValidator # Set up logging logging.basicConfig(level=LOG_LEVEL) logger = logging.getLogger(__name__) class GovInfoIngestor: """Handles downloading and processing of govinfo.gov bulk data.""" def __init__( self, output_dir: Path = OUTPUT_DIR, workers: int = WORKERS, timeout: int = REQUEST_TIMEOUT, max_retries: int = MAX_RETRIES, rate_limit: int = RATE_LIMIT, chunk_size: int = CHUNK_SIZE, validate_xml: bool = VALIDATE_XML, ): """Initialize the ingestor with configuration. Args: output_dir: Directory to save downloaded files workers: Number of parallel downloads timeout: Request timeout in seconds max_retries: Maximum number of retry attempts rate_limit: Maximum requests per second chunk_size: Chunk size for downloads in bytes validate_xml: Whether to validate XML against schemas """ self.output_dir = Path(output_dir) self.workers = workers self.timeout = aiohttp.ClientTimeout(total=timeout) self.max_retries = max_retries self.rate_limit = rate_limit self.chunk_size = chunk_size self.validate_xml = validate_xml self.semaphore = asyncio.Semaphore(workers) # Initialize rate limiter self.rate_limiter = RateLimiter(rate_limit, capacity=rate_limit * 2) # Initialize XML validator if needed self.validator = XMLValidator() if validate_xml else None # Map document types to schema names self.schema_map = { "BILLS": "bills", "BILLSTATUS": "billstatus", "PLAW": "plaw", "STATUTE": "statute", "FR": "federalregister", "CREC": "crec", } # Ensure output directory exists self.output_dir.mkdir(parents=True, exist_ok=True) async def validate_downloaded_file(self, file_path: Path, doc_type: str) -> bool: """Validate a downloaded XML file against its schema. Args: file_path: Path to the downloaded file doc_type: Document type for schema lookup Returns: bool: True if valid, False otherwise """ if not self.validator or doc_type not in self.schema_map: return True try: content = file_path.read_text(encoding="utf-8") schema_name = self.schema_map[doc_type] is_valid, errors = self.validator.validate_xml(content, schema_name) if not is_valid: logger.error(f"XML validation failed for {file_path}: {errors[:3]}") if len(errors) > 3: logger.error(f"... and {len(errors) - 3} more validation errors") return False return True except Exception as e: logger.error(f"Error during XML validation for {file_path}: {str(e)}") return False async def download_file( self, session: aiohttp.ClientSession, url: str, output_path: Path, doc_type: str, retries: int = 0, ) -> bool: """Download a single file with retries and validation.""" if output_path.exists(): logger.info(f"File exists, skipping: {output_path}") return True async with self.semaphore: for attempt in range(retries + 1): try: # Apply rate limiting await self.rate_limiter.acquire() # Make the request async with session.get(url, timeout=self.timeout) as response: if response.status == 200: # Create parent directory if it doesn't exist output_path.parent.mkdir(parents=True, exist_ok=True) # Download the file async with aiofiles.open(output_path, "wb") as f: await f.write(await response.read()) # Validate if needed if self.validate_xml and doc_type in self.schema_map: is_valid = await self.validate_downloaded_file( output_path, doc_type ) if not is_valid: output_path.unlink() # Remove invalid file logger.error( f"Removed invalid XML file: {output_path}" ) return False logger.debug(f"Downloaded and validated: {output_path}") return True elif response.status == 404: logger.warning(f"Not found (404): {url}") return False else: logger.warning( f"Failed to download {url}: {response.status}" ) except (aiohttp.ClientError, asyncio.TimeoutError) as e: logger.warning( f"Error downloading {url} (attempt {attempt + 1}): {e}" ) if attempt < retries: await asyncio.sleep(RETRY_DELAY * (attempt + 1)) continue except Exception as e: logger.error(f"Unexpected error downloading {url}: {e}") return False return False async def get_document_list( self, session: aiohttp.ClientSession, congress: int, doc_type: str, ) -> list[str]: """Recursively retrieve all XML file URLs for a given congress and document type. Uses govinfo bulkdata XML/JSON directory listings. """ base = f"{BASE_URL}/{doc_type}/{congress}/" async def list_dir(url: str, depth: int = 0) -> list[str]: files: list[str] = [] try: # Prefer XML listing endpoint listing_xml = url.replace("/bulkdata/", "/bulkdata/xml/") async with session.get( listing_xml, timeout=self.timeout, headers={"Accept": "application/xml,*/*"}, ) as resp: if resp.status == 200: text = await resp.text() try: import xml.etree.ElementTree as ET root = ET.fromstring(text) for f in root.findall(".//file"): link_el = f.find("link") is_folder_el = f.find("folder") link = link_el.text if link_el is not None else None is_folder = ( (is_folder_el.text.lower() == "true") if is_folder_el is not None else False ) if not link: continue if is_folder: # Recurse into subdirectory sub_files = await list_dir(link, depth + 1) files.extend(sub_files) else: if link.lower().endswith(".xml"): files.append(link) except Exception: # Fall back to JSON listing endpoint listing_json = url.replace("/bulkdata/", "/bulkdata/json/") async with session.get( listing_json, timeout=self.timeout, headers={"Accept": "application/json"}, ) as jresp: if jresp.status == 200: data = await jresp.json() def walk(node) -> list[str]: acc: list[str] = [] if isinstance(node, dict): # JSON formats vary; look for keys if ( node.get("folder") is True and "link" in node ): acc.extend(walk(node.get("files", []))) elif ( node.get("folder") is False and "link" in node ): link = node["link"] if isinstance( link, str ) and link.lower().endswith(".xml"): acc.append(link) else: for v in node.values(): acc.extend(walk(v)) elif isinstance(node, list): for item in node: acc.extend(walk(item)) return acc files.extend(walk(data)) else: logger.warning( f"Directory listing not available: {listing_json} ({jresp.status})" ) else: logger.warning( f"Directory listing not available: {listing_xml} ({resp.status})" ) except Exception as e: logger.error(f"Error listing {url}: {e}") return files return await list_dir(base) async def process_document_type( self, session: aiohttp.ClientSession, congress: int, doc_type: str, ) -> int: """Process all documents of a specific type for a given congress.""" logger.info(f"Processing {doc_type} for Congress {congress}") # Get list of documents documents = await self.get_document_list(session, congress, doc_type) if not documents: logger.warning(f"No documents found for {doc_type} in Congress {congress}") return 0 # Ensure output directory exists and snapshot existing files doc_dir = self.output_dir / str(congress) / doc_type doc_dir.mkdir(parents=True, exist_ok=True) before_files = {p.name for p in doc_dir.glob('*.xml')} # Download documents with URL mapping for failure tracking tasks_map: dict[asyncio.Task, str] = {} for doc_url in documents: filename = doc_url.rstrip("/").split("/")[-1] output_path = doc_dir / filename task = asyncio.create_task( self.download_file( session, doc_url, output_path, doc_type, retries=self.max_retries ) ) tasks_map[task] = doc_url attempted = len(tasks_map) success_count = 0 failure_count = 0 failed_urls: list[str] = [] for f in tqdm( asyncio.as_completed(list(tasks_map.keys())), total=attempted, desc=f"{doc_type}-{congress}", unit="file", ): ok = await f if ok: success_count += 1 else: failure_count += 1 failed_urls.append(tasks_map[f]) # Manifest and failures after_files = {p.name for p in doc_dir.glob('*.xml')} new_files = sorted(after_files - before_files) total_files_in_dir = len(after_files) total_bytes_in_dir = sum(p.stat().st_size for p in doc_dir.glob('*.xml')) manifest = { 'congress': congress, 'doc_type': doc_type, 'attempted': attempted, 'succeeded': success_count, 'failed': failure_count, 'new_files_count': len(new_files), 'dir_total_files': total_files_in_dir, 'dir_total_bytes': total_bytes_in_dir, 'started_at': datetime.utcnow().isoformat() + 'Z', 'finished_at': datetime.utcnow().isoformat() + 'Z', 'new_files': new_files, } try: (doc_dir / 'manifest.json').write_text(json.dumps(manifest, indent=2)) except Exception as e: logger.warning(f"Unable to write manifest for {doc_type}-{congress}: {e}") if failure_count > 0: try: (doc_dir / 'failures.json').write_text(json.dumps({'failed_urls': failed_urls}, indent=2)) except Exception as e: logger.warning(f"Unable to write failures log for {doc_type}-{congress}: {e}") logger.info( f"Processed {success_count}/{attempted} {doc_type} files for Congress {congress} " f"(new: {len(new_files)}, dir_total: {total_files_in_dir}, failures: {failure_count})" ) return success_count async def process_congress( self, session: aiohttp.ClientSession, congress: int, doc_types: list[str] | None = None, ) -> dict[str, int]: """Process all document types for a specific congress.""" if doc_types is None: doc_types = DOCUMENT_TYPES results = {} congress_start_time = time.monotonic() total_attempted = 0 total_succeeded = 0 total_failed = 0 total_skipped = 0 logger.info(f"Starting Congress {congress} processing for {len(doc_types)} document types") for doc_type in doc_types: doc_start_time = time.monotonic() count = await self.process_document_type(session, congress, doc_type) doc_elapsed = time.monotonic() - doc_start_time # Get detailed stats for this document type doc_dir = self.output_dir / str(congress) / doc_type if doc_dir.exists(): total_files = len(list(doc_dir.glob('*.xml'))) manifest_path = doc_dir / 'manifest.json' if manifest_path.exists(): try: manifest = json.loads(manifest_path.read_text()) attempted = manifest.get('attempted', 0) succeeded = manifest.get('succeeded', 0) failed = manifest.get('failed', 0) skipped = attempted - succeeded - failed except Exception: attempted = succeeded = failed = skipped = 0 else: attempted = succeeded = count failed = 0 skipped = 0 else: total_files = attempted = succeeded = failed = skipped = 0 results[doc_type] = succeeded total_attempted += attempted total_succeeded += succeeded total_failed += failed total_skipped += skipped logger.info( f"Congress {congress} - {doc_type}: {succeeded}/{attempted} files " f"(failed: {failed}, skipped: {skipped}, total_in_dir: {total_files}, " f"time: {doc_elapsed:.1f}s)" ) congress_elapsed = time.monotonic() - congress_start_time # Congress-level summary logger.info( f"Congress {congress} COMPLETE: {total_succeeded}/{total_attempted} total files " f"(failed: {total_failed}, skipped: {total_skipped}, time: {congress_elapsed:.1f}s)" ) return results async def run( self, congresses: list[int] | None = None, doc_types: list[str] | None = None, ) -> dict[int, dict[str, int]]: """Run the ingestion process.""" if congresses is None: congresses = CONGRESS_SESSIONS results = {} overall_start_time = time.monotonic() total_attempted_all = 0 total_succeeded_all = 0 total_failed_all = 0 total_skipped_all = 0 logger.info(f"Starting ingestion for {len(congresses)} congresses: {congresses}") async with aiohttp.ClientSession() as session: for congress in congresses: results[congress] = await self.process_congress( session, congress, doc_types ) # Accumulate totals from this congress for doc_type, count in results[congress].items(): doc_dir = self.output_dir / str(congress) / doc_type if doc_dir.exists(): manifest_path = doc_dir / 'manifest.json' if manifest_path.exists(): try: manifest = json.loads(manifest_path.read_text()) total_attempted_all += manifest.get('attempted', 0) total_succeeded_all += manifest.get('succeeded', 0) total_failed_all += manifest.get('failed', 0) except Exception: total_attempted_all += count total_succeeded_all += count else: total_attempted_all += count total_succeeded_all += count total_skipped_all = total_attempted_all - total_succeeded_all - total_failed_all overall_elapsed = time.monotonic() - overall_start_time # Final comprehensive summary logger.info("=" * 80) logger.info("INGESTION COMPLETE - FINAL SUMMARY") logger.info("=" * 80) logger.info(f"Congresses processed: {congresses}") logger.info(f"Document types: {doc_types if doc_types else 'ALL'}") logger.info(f"Total time: {overall_elapsed:.1f}s ({overall_elapsed/60:.1f}m)") logger.info(f"Overall results: {total_succeeded_all}/{total_attempted_all} files") logger.info(f" - Succeeded: {total_succeeded_all}") logger.info(f" - Failed: {total_failed_all}") logger.info(f" - Skipped: {total_skipped_all}") logger.info(f" - Success rate: {(total_succeeded_all/total_attempted_all*100):.1f}%" if total_attempted_all > 0 else " - Success rate: N/A") # Per-congress breakdown logger.info("Per-congress breakdown:") for congress in congresses: congress_total = sum(results[congress].values()) logger.info(f" Congress {congress}: {congress_total} files downloaded") for doc_type, count in results[congress].items(): if count > 0: logger.info(f" - {doc_type}: {count}") logger.info("=" * 80) return results async def ingest_congress_data( congress: int, doc_types: list[str] | None = None, output_dir: Path | None = None, workers: int = WORKERS, ) -> dict[str, int]: """ Ingest data for a specific congress. Args: congress: The congress number (e.g., 115) doc_types: List of document types to download (None for all) output_dir: Directory to save downloaded files workers: Number of parallel downloads Returns: Dictionary with counts of downloaded files by document type """ ingestor = GovInfoIngestor(output_dir=output_dir or OUTPUT_DIR, workers=workers) async with aiohttp.ClientSession() as session: return await ingestor.process_congress(session, congress, doc_types) async def ingest_all_congresses( congresses: list[int] | None = None, doc_types: list[str] | None = None, output_dir: Path | None = None, workers: int = WORKERS, ) -> dict[int, dict[str, int]]: """ Ingest data for multiple congresses. Args: congresses: List of congress numbers (None for all) doc_types: List of document types to download (None for all) output_dir: Directory to save downloaded files workers: Number of parallel downloads per congress Returns: Nested dictionary with counts of downloaded files by congress and document type """ ingestor = GovInfoIngestor(output_dir=output_dir or OUTPUT_DIR, workers=workers) return await ingestor.run(congresses, doc_types) def main(): """Command-line entry point.""" import argparse parser = argparse.ArgumentParser(description="Download bulk data from govinfo.gov") parser.add_argument( "--congress", type=int, nargs="+", help="Congress numbers to download (default: all 113-119)", ) parser.add_argument( "--doc-types", nargs="+", choices=DOCUMENT_TYPES, help=f"Document types to download (default: all: {', '.join(DOCUMENT_TYPES)})", ) parser.add_argument( "--output", type=Path, default=OUTPUT_DIR, help=f"Output directory (default: {OUTPUT_DIR})", ) parser.add_argument( "--workers", type=int, default=WORKERS, help=f"Number of parallel downloads (default: {WORKERS})", ) parser.add_argument( "--log-level", default=LOG_LEVEL, choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], help="Logging level (default: INFO)", ) args = parser.parse_args() # Configure logging logging.basicConfig(level=args.log_level) # Run the ingestion asyncio.run( ingest_all_congresses( congresses=args.congress, doc_types=args.doc_types, output_dir=args.output, workers=args.workers, ) ) if __name__ == "__main__": main()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/cbwinslow/opendiscourse_mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server