Skip to main content
Glama
coldfire-x

Owner avatar beijing-car-quota-draw

pdf_parser.py11.1 kB
""" PDF parser for Beijing car quota lottery results. Handles two main formats: 1. Waiting List (轮候序号列表) - Format: 序号 申请编码 轮候时间 2. Score Ranking (积分排序入围名单) - Format: 序号 申请编码 姓名 身份证号 家庭代际数 积分 注册时间 """ import re import logging from datetime import datetime from pathlib import Path from typing import List, Optional, Tuple, Dict, Any import pdfplumber from ..models.quota_result import ( QuotaResult, QuotaType, WaitingListEntry, ScoreRankingEntry, PDFMetadata ) logger = logging.getLogger(__name__) class PDFFormatDetector: """Detects the format of Beijing car quota lottery PDF files.""" @staticmethod def detect_format(text_sample: str) -> QuotaType: """ Detect PDF format based on text content. Args: text_sample: Sample text from the PDF (first few pages) Returns: QuotaType indicating the detected format """ # Check for score ranking indicators score_indicators = [ "主申请人姓名", "主申请人证件号码", "家庭总积分", "家庭代际数" ] # Check for waiting list indicators waiting_indicators = [ "轮候时间", "申请编码" ] # Count indicators score_count = sum(1 for indicator in score_indicators if indicator in text_sample) waiting_count = sum(1 for indicator in waiting_indicators if indicator in text_sample) if score_count >= 2: return QuotaType.SCORE_RANKING elif waiting_count >= 1 and "积分" not in text_sample: return QuotaType.WAITING_LIST else: return QuotaType.UNKNOWN class PDFParser: """Parser for Beijing car quota lottery PDF files.""" def __init__(self): self.format_detector = PDFFormatDetector() # Regex patterns for different formats self.waiting_list_pattern = re.compile( r'^(\d+)\s+(\d+)\s+(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2}\.\d{3})$' ) # More flexible pattern for score ranking due to variable name lengths self.score_ranking_pattern = re.compile( r'^(\d+)\s+(\d+)\s+([^\d\s]+?)\s+(\d{6}\*+\d{4})\s+(\d+)\s+(\d+)\s+(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2}\.\d{3})$' ) def parse_pdf(self, pdf_path: Path, source_url: str = "") -> QuotaResult: """ Parse a PDF file and extract quota lottery results. Args: pdf_path: Path to the PDF file source_url: URL where the PDF was downloaded from Returns: QuotaResult containing parsed data """ logger.info(f"Parsing PDF: {pdf_path}") try: with pdfplumber.open(pdf_path) as pdf: # Extract metadata file_size = pdf_path.stat().st_size page_count = len(pdf.pages) # Get sample text for format detection sample_text = "" for i, page in enumerate(pdf.pages[:3]): # First 3 pages sample_text += page.extract_text() or "" if len(sample_text) > 2000: # Enough for detection break # Detect format quota_type = self.format_detector.detect_format(sample_text) logger.info(f"Detected format: {quota_type}") # Extract all text all_entries = [] for page_num, page in enumerate(pdf.pages): page_text = page.extract_text() if page_text: entries = self._extract_entries_from_page(page_text, quota_type) all_entries.extend(entries) if page_num % 50 == 0: # Log progress every 50 pages logger.info(f"Processed {page_num + 1}/{page_count} pages") # Create metadata metadata = PDFMetadata( filename=pdf_path.name, source_url=source_url, download_time=datetime.fromtimestamp(pdf_path.stat().st_mtime), file_size=file_size, page_count=page_count, entry_count=len(all_entries), quota_type=quota_type, processing_time=datetime.now() ) # Create result object result = QuotaResult(metadata=metadata) # Parse entries based on format if quota_type == QuotaType.WAITING_LIST: result.waiting_list_entries = self._parse_waiting_list_entries(all_entries) elif quota_type == QuotaType.SCORE_RANKING: result.score_ranking_entries = self._parse_score_ranking_entries(all_entries) # Build indexes for fast lookup result.build_indexes() logger.info(f"Successfully parsed {len(all_entries)} entries from {pdf_path}") return result except Exception as e: logger.error(f"Error parsing PDF {pdf_path}: {e}") raise def _extract_entries_from_page(self, page_text: str, quota_type: QuotaType) -> List[str]: """Extract data entries from a page of text.""" lines = page_text.split('\n') entries = [] for line in lines: line = line.strip() if not line: continue # Skip headers and non-data lines if any(header in line for header in [ "序号", "申请编码", "轮候时间", "主申请人", "积分", "页码", "共", "页" ]): continue # Check if line matches expected data pattern if quota_type == QuotaType.WAITING_LIST: if self.waiting_list_pattern.match(line): entries.append(line) elif quota_type == QuotaType.SCORE_RANKING: if self.score_ranking_pattern.match(line): entries.append(line) else: # For unknown format, try both patterns if self.waiting_list_pattern.match(line) or self.score_ranking_pattern.match(line): entries.append(line) return entries def _parse_waiting_list_entries(self, raw_entries: List[str]) -> List[WaitingListEntry]: """Parse waiting list entries.""" entries = [] for raw_entry in raw_entries: match = self.waiting_list_pattern.match(raw_entry) if match: try: sequence_number = int(match.group(1)) application_code = match.group(2) waiting_time = datetime.strptime(match.group(3), "%Y-%m-%d %H:%M:%S.%f") entry = WaitingListEntry( sequence_number=sequence_number, application_code=application_code, waiting_time=waiting_time ) entries.append(entry) except (ValueError, IndexError) as e: logger.warning(f"Failed to parse waiting list entry: {raw_entry}, error: {e}") return entries def _parse_score_ranking_entries(self, raw_entries: List[str]) -> List[ScoreRankingEntry]: """Parse score ranking entries.""" entries = [] for raw_entry in raw_entries: match = self.score_ranking_pattern.match(raw_entry) if match: try: sequence_number = int(match.group(1)) application_code = match.group(2) applicant_name = match.group(3).strip() id_number = match.group(4) family_generation_count = int(match.group(5)) total_family_score = int(match.group(6)) earliest_registration_time = datetime.strptime(match.group(7), "%Y-%m-%d %H:%M:%S.%f") entry = ScoreRankingEntry( sequence_number=sequence_number, application_code=application_code, applicant_name=applicant_name, id_number=id_number, family_generation_count=family_generation_count, total_family_score=total_family_score, earliest_registration_time=earliest_registration_time ) entries.append(entry) except (ValueError, IndexError) as e: logger.warning(f"Failed to parse score ranking entry: {raw_entry}, error: {e}") return entries def validate_parsed_data(self, result: QuotaResult) -> Dict[str, Any]: """Validate parsed data and return validation report.""" report = { "is_valid": True, "errors": [], "warnings": [], "statistics": result.get_statistics() } # Check if we have any entries total_entries = len(result.waiting_list_entries) + len(result.score_ranking_entries) if total_entries == 0: report["is_valid"] = False report["errors"].append("No valid entries found in PDF") # Check sequence number continuity if result.metadata.quota_type == QuotaType.WAITING_LIST: sequence_numbers = [entry.sequence_number for entry in result.waiting_list_entries] elif result.metadata.quota_type == QuotaType.SCORE_RANKING: sequence_numbers = [entry.sequence_number for entry in result.score_ranking_entries] else: sequence_numbers = [] if sequence_numbers: expected_sequence = list(range(1, len(sequence_numbers) + 1)) if sequence_numbers != expected_sequence: report["warnings"].append("Sequence numbers are not continuous") # Check for duplicate application codes if result.metadata.quota_type == QuotaType.WAITING_LIST: app_codes = [entry.application_code for entry in result.waiting_list_entries] elif result.metadata.quota_type == QuotaType.SCORE_RANKING: app_codes = [entry.application_code for entry in result.score_ranking_entries] else: app_codes = [] if len(app_codes) != len(set(app_codes)): report["warnings"].append("Duplicate application codes found") return report

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/coldfire-x/beijing-car-quota-draw-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server