Skip to main content
Glama
coldfire-x

Owner avatar beijing-car-quota-draw

data_store.py13 kB
""" Data storage and management for Beijing car quota lottery results. Provides in-memory storage with disk persistence for quota results. """ import json import logging from datetime import datetime from pathlib import Path from typing import Dict, List, Optional, Any import aiofiles from ..models.quota_result import QuotaResult, QuotaType logger = logging.getLogger(__name__) class DataStore: """In-memory data store with disk persistence for quota results.""" def __init__(self, storage_dir: Path = Path("data")): """ Initialize the data store. Args: storage_dir: Directory to store persistent data """ self.storage_dir = storage_dir self.storage_dir.mkdir(exist_ok=True) # In-memory storage self.quota_results: Dict[str, QuotaResult] = {} # Index for fast lookups self.application_code_index: Dict[str, List[str]] = {} # code -> [filenames] self.id_number_index: Dict[str, List[str]] = {} # partial_id -> [filenames] # Metadata self.last_update: Optional[datetime] = None self.total_entries: int = 0 async def add_quota_result(self, result: QuotaResult) -> None: """ Add a quota result to the store. Args: result: QuotaResult to add """ filename = result.metadata.filename logger.info(f"Adding quota result: {filename}") # Store in memory self.quota_results[filename] = result # Update indexes self._update_indexes(filename, result) # Update metadata self.last_update = datetime.now() self.total_entries += result.metadata.entry_count # Persist to disk await self._save_result_to_disk(result) await self._save_metadata() logger.info(f"Added {result.metadata.entry_count} entries from {filename}") def _update_indexes(self, filename: str, result: QuotaResult) -> None: """Update search indexes with new result.""" # Index application codes if result.metadata.quota_type == QuotaType.WAITING_LIST: for entry in result.waiting_list_entries: if entry.application_code not in self.application_code_index: self.application_code_index[entry.application_code] = [] self.application_code_index[entry.application_code].append(filename) elif result.metadata.quota_type == QuotaType.SCORE_RANKING: for entry in result.score_ranking_entries: # Index application code if entry.application_code not in self.application_code_index: self.application_code_index[entry.application_code] = [] self.application_code_index[entry.application_code].append(filename) # Index partial ID number if entry.id_number and len(entry.id_number) >= 10: id_prefix = entry.id_number[:6] id_suffix = entry.id_number[-4:] partial_id = f"{id_prefix}****{id_suffix}" if partial_id not in self.id_number_index: self.id_number_index[partial_id] = [] self.id_number_index[partial_id].append(filename) async def find_by_application_code(self, application_code: str) -> List[Dict[str, Any]]: """ Find entries by application code across all results. Args: application_code: Application code to search for Returns: List of matching entries with source information """ results = [] if application_code in self.application_code_index: for filename in self.application_code_index[application_code]: if filename in self.quota_results: quota_result = self.quota_results[filename] entry_data = quota_result.find_by_application_code(application_code) if entry_data: entry_data["source_file"] = filename entry_data["source_url"] = quota_result.metadata.source_url entry_data["download_time"] = quota_result.metadata.download_time results.append(entry_data) return results async def find_by_partial_id(self, id_prefix: str, id_suffix: str) -> List[Dict[str, Any]]: """ Find entries by partial ID number across all results. Args: id_prefix: First 6 digits of ID number id_suffix: Last 4 digits of ID number Returns: List of matching entries with source information """ results = [] partial_id = f"{id_prefix}****{id_suffix}" if partial_id in self.id_number_index: for filename in self.id_number_index[partial_id]: if filename in self.quota_results: quota_result = self.quota_results[filename] entry_list = quota_result.find_by_partial_id(id_prefix, id_suffix) for entry_data in entry_list: entry_data["source_file"] = filename entry_data["source_url"] = quota_result.metadata.source_url entry_data["download_time"] = quota_result.metadata.download_time results.append(entry_data) return results async def find_by_id_prefix_or_suffix(self, id_prefix: Optional[str] = None, id_suffix: Optional[str] = None) -> List[Dict[str, Any]]: """ Find entries by ID prefix and/or suffix across all results. Args: id_prefix: First 6 digits of ID number (optional) id_suffix: Last 4 digits of ID number (optional) Returns: List of matching entries with source information """ if not id_prefix and not id_suffix: return [] results = [] # If both prefix and suffix provided, use exact match if id_prefix and id_suffix: return await self.find_by_partial_id(id_prefix, id_suffix) # Search all indexed ID numbers for matches for partial_id, filenames in self.id_number_index.items(): # partial_id format: "123456****7890" current_prefix = partial_id[:6] current_suffix = partial_id[-4:] # Check if current ID matches search criteria match = False if id_prefix and current_prefix == id_prefix: match = True elif id_suffix and current_suffix == id_suffix: match = True if match: # Get entries from matching files for filename in filenames: if filename in self.quota_results: quota_result = self.quota_results[filename] entry_list = quota_result.find_by_partial_id(current_prefix, current_suffix) for entry_data in entry_list: entry_data["source_file"] = filename entry_data["source_url"] = quota_result.metadata.source_url entry_data["download_time"] = quota_result.metadata.download_time results.append(entry_data) return results async def get_statistics(self) -> Dict[str, Any]: """Get statistics about stored data.""" stats = { "total_files": len(self.quota_results), "total_entries": self.total_entries, "last_update": self.last_update, "application_codes_indexed": len(self.application_code_index), "id_numbers_indexed": len(self.id_number_index), "files_by_type": {}, "files": [] } # Count by type for filename, result in self.quota_results.items(): quota_type = result.metadata.quota_type.value if quota_type not in stats["files_by_type"]: stats["files_by_type"][quota_type] = 0 stats["files_by_type"][quota_type] += 1 # Add file info stats["files"].append({ "filename": filename, "type": quota_type, "entries": result.metadata.entry_count, "source_url": result.metadata.source_url, "download_time": result.metadata.download_time }) return stats async def load_from_disk(self) -> None: """Load stored results from disk.""" logger.info("Loading quota results from disk") results_dir = self.storage_dir / "results" if not results_dir.exists(): logger.info("No stored results found") return loaded_count = 0 for result_file in results_dir.glob("*.json"): try: async with aiofiles.open(result_file, 'r', encoding='utf-8') as f: data = json.loads(await f.read()) # Reconstruct QuotaResult object result = QuotaResult.parse_obj(data) # Add to store filename = result.metadata.filename self.quota_results[filename] = result self._update_indexes(filename, result) self.total_entries += result.metadata.entry_count loaded_count += 1 except Exception as e: logger.error(f"Error loading result from {result_file}: {e}") # Load metadata await self._load_metadata() logger.info(f"Loaded {loaded_count} quota results from disk") async def _save_result_to_disk(self, result: QuotaResult) -> None: """Save a single result to disk.""" results_dir = self.storage_dir / "results" results_dir.mkdir(exist_ok=True) filename = result.metadata.filename result_file = results_dir / f"{filename}.json" # Convert to dict for JSON serialization data = result.dict() async with aiofiles.open(result_file, 'w', encoding='utf-8') as f: await f.write(json.dumps(data, ensure_ascii=False, indent=2, default=str)) async def _save_metadata(self) -> None: """Save store metadata to disk.""" metadata = { "last_update": self.last_update.isoformat() if self.last_update else None, "total_entries": self.total_entries, "total_files": len(self.quota_results) } metadata_file = self.storage_dir / "metadata.json" async with aiofiles.open(metadata_file, 'w', encoding='utf-8') as f: await f.write(json.dumps(metadata, ensure_ascii=False, indent=2)) async def _load_metadata(self) -> None: """Load store metadata from disk.""" metadata_file = self.storage_dir / "metadata.json" if not metadata_file.exists(): return try: async with aiofiles.open(metadata_file, 'r', encoding='utf-8') as f: metadata = json.loads(await f.read()) if metadata.get("last_update"): self.last_update = datetime.fromisoformat(metadata["last_update"]) except Exception as e: logger.error(f"Error loading metadata: {e}") async def clear_all(self) -> None: """Clear all stored data.""" logger.info("Clearing all stored data") self.quota_results.clear() self.application_code_index.clear() self.id_number_index.clear() self.last_update = None self.total_entries = 0 # Clear disk storage results_dir = self.storage_dir / "results" if results_dir.exists(): for result_file in results_dir.glob("*.json"): result_file.unlink() metadata_file = self.storage_dir / "metadata.json" if metadata_file.exists(): metadata_file.unlink() def get_result_by_filename(self, filename: str) -> Optional[QuotaResult]: """Get a specific result by filename.""" return self.quota_results.get(filename) def list_filenames(self) -> List[str]: """Get list of all stored filenames.""" return list(self.quota_results.keys())

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/coldfire-x/beijing-car-quota-draw-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server