Skip to main content
Glama
coldfire-x

Owner avatar beijing-car-quota-draw

web_scraper.py10.7 kB
""" Web scraper for Beijing Transportation Commission car quota lottery results. Uses crawl4ai to scrape https://xkczb.jtw.beijing.gov.cn/jggb/index.html and download relevant PDF files. """ import asyncio import logging import re from datetime import datetime from pathlib import Path from typing import List, Dict, Optional, Tuple from urllib.parse import urljoin, urlparse import aiofiles import httpx from crawl4ai import AsyncWebCrawler logger = logging.getLogger(__name__) class WebScraper: """Scraper for Beijing Transportation Commission website.""" def __init__(self, download_dir: Path = Path("downloads")): """ Initialize the web scraper. Args: download_dir: Directory to save downloaded PDF files """ self.base_url = "https://xkczb.jtw.beijing.gov.cn/jggb/index.html" self.download_dir = download_dir self.download_dir.mkdir(exist_ok=True) # Keywords to match for relevant links self.target_keywords = [ "北京市家庭新能源小客车指标", "北京市单位新能源小客车指标", "北京市个人新能源" ] # HTTP client for downloading files self.http_client = httpx.AsyncClient( timeout=30.0, verify=False, # Disable SSL verification to avoid certificate issues headers={ "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" } ) async def scrape_and_download(self, max_pages: int = 5) -> List[Dict[str, str]]: """ Scrape the website and download relevant PDF files. Args: max_pages: Maximum number of pages to scrape Returns: List of dictionaries containing PDF info: {filename, url, title} """ logger.info(f"Starting scrape of {self.base_url}, max pages: {max_pages}") downloaded_files = [] try: async with AsyncWebCrawler(verbose=True) as crawler: # Scrape each page for page_num in range(1, max_pages + 1): page_url = self._get_page_url(page_num) logger.info(f"Scraping page {page_num}: {page_url}") # Crawl the page result = await crawler.arun(url=page_url) if result.success: # Extract relevant links relevant_links = self._extract_relevant_links( result.html, page_url ) logger.info(f"Found {len(relevant_links)} relevant links on page {page_num}") # Process each relevant link for link_info in relevant_links: try: pdf_files = await self._process_link(crawler, link_info) downloaded_files.extend(pdf_files) except Exception as e: logger.error(f"Error processing link {link_info['url']}: {e}") else: logger.warning(f"Failed to scrape page {page_num}: {result.error_message}") # Add delay between pages await asyncio.sleep(2) except Exception as e: logger.error(f"Error during scraping: {e}") raise logger.info(f"Scraping completed. Downloaded {len(downloaded_files)} PDF files") return downloaded_files def _get_page_url(self, page_num: int) -> str: """Get URL for a specific page number.""" if page_num == 1: return self.base_url else: # Assuming pagination pattern - may need adjustment based on actual site return f"https://xkczb.jtw.beijing.gov.cn/jggb/index_{page_num}.html" def _extract_relevant_links(self, html_content: str, base_url: str) -> List[Dict[str, str]]: """ Extract links that match target keywords. Args: html_content: HTML content of the page base_url: Base URL for resolving relative links Returns: List of relevant link information """ relevant_links = [] # Pattern to match links with titles link_pattern = re.compile( r'<a[^>]*href=["\']([^"\']+)["\'][^>]*>([^<]+)</a>', re.IGNORECASE | re.DOTALL ) matches = link_pattern.findall(html_content) for href, title in matches: title = title.strip() # Check if title matches any target keywords if any(keyword in title for keyword in self.target_keywords): full_url = urljoin(base_url, href) relevant_links.append({ "url": full_url, "title": title, "href": href }) logger.info(f"Found relevant link: {title} -> {full_url}") return relevant_links async def _process_link(self, crawler: AsyncWebCrawler, link_info: Dict[str, str]) -> List[Dict[str, str]]: """ Process a relevant link and download any PDF files found. Args: crawler: AsyncWebCrawler instance link_info: Information about the link to process Returns: List of downloaded PDF file information """ logger.info(f"Processing link: {link_info['title']}") # Crawl the target page result = await crawler.arun(url=link_info["url"]) if not result.success: logger.warning(f"Failed to crawl {link_info['url']}: {result.error_message}") return [] # Extract PDF links from the page pdf_links = self._extract_pdf_links(result.html, link_info["url"]) downloaded_files = [] # Download each PDF for pdf_url in pdf_links: try: filename = await self._download_pdf(pdf_url, link_info["url"]) if filename: downloaded_files.append({ "filename": filename, "url": pdf_url, "source_page": link_info["url"], "title": link_info["title"] }) except Exception as e: logger.error(f"Error downloading PDF {pdf_url}: {e}") return downloaded_files def _extract_pdf_links(self, html_content: str, base_url: str) -> List[str]: """Extract PDF download links from HTML content.""" pdf_links = [] # Pattern to match PDF links pdf_pattern = re.compile( r'<a[^>]*href=["\']([^"\']*\.pdf[^"\']*)["\']', re.IGNORECASE ) matches = pdf_pattern.findall(html_content) for href in matches: full_url = urljoin(base_url, href) pdf_links.append(full_url) logger.info(f"Found PDF link: {full_url}") return pdf_links async def _download_pdf(self, pdf_url: str, source_url: str) -> Optional[str]: """ Download a PDF file. Args: pdf_url: URL of the PDF to download source_url: URL of the page containing the PDF link Returns: Filename of the downloaded file, or None if failed """ try: logger.info(f"Downloading PDF: {pdf_url}") # Generate filename timestamp = int(datetime.now().timestamp() * 1000) filename = f"{timestamp}.pdf" filepath = self.download_dir / filename # Download the file response = await self.http_client.get(pdf_url) response.raise_for_status() # Save to file async with aiofiles.open(filepath, 'wb') as f: await f.write(response.content) # Log download info file_size = len(response.content) logger.info(f"Downloaded {filename} ({file_size} bytes) from {pdf_url}") # Save mapping info await self._save_url_mapping(filename, pdf_url, source_url) return filename except Exception as e: logger.error(f"Failed to download PDF {pdf_url}: {e}") return None async def _save_url_mapping(self, filename: str, pdf_url: str, source_url: str): """Save mapping between filename and URLs.""" mapping_file = self.download_dir / "url_mapping.txt" mapping_line = f"{filename} {pdf_url} {source_url}\n" async with aiofiles.open(mapping_file, 'a', encoding='utf-8') as f: await f.write(mapping_line) async def get_existing_downloads(self) -> List[Dict[str, str]]: """Get list of already downloaded files.""" mapping_file = self.download_dir / "url_mapping.txt" if not mapping_file.exists(): return [] downloads = [] async with aiofiles.open(mapping_file, 'r', encoding='utf-8') as f: async for line in f: parts = line.strip().split(' ', 2) if len(parts) >= 2: downloads.append({ "filename": parts[0], "url": parts[1], "source_page": parts[2] if len(parts) > 2 else "" }) return downloads def cleanup_old_files(self, keep_days: int = 30): """Clean up old downloaded files.""" cutoff_time = datetime.now().timestamp() - (keep_days * 24 * 3600) for pdf_file in self.download_dir.glob("*.pdf"): if pdf_file.stat().st_mtime < cutoff_time: logger.info(f"Removing old file: {pdf_file}") pdf_file.unlink() async def __aenter__(self): """Async context manager entry.""" return self async def __aexit__(self, exc_type, exc_val, exc_tb): """Async context manager exit.""" await self.http_client.aclose()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/coldfire-x/beijing-car-quota-draw-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server