Skip to main content
Glama

QQ Channel Data Collector

by elleys
complete_hellokitty_downloader.py18.2 kB
#!/usr/bin/env python3 """ 完整HelloKitty图片下载器 - 基于Chrome方案 使用经过验证的enhanced_channel_scraper获取帖子,然后下载所有图片 """ import os import asyncio import aiohttp import time from datetime import datetime, timedelta from pathlib import Path from urllib.parse import urlparse import logging # 导入智能增量管理器 try: from .smart_incremental_manager import SmartIncrementalManager except ImportError: from smart_incremental_manager import SmartIncrementalManager # 配置日志 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) class CompleteHelloKittyDownloader: """完整的HelloKitty图片下载器""" def __init__(self, download_dir=None): if download_dir is None: # 使用新的目录管理器 from src.utils.directory_manager import get_media_subdirectory self.download_dir = get_media_subdirectory("images") else: self.download_dir = Path(download_dir) self.download_dir.mkdir(parents=True, exist_ok=True) # 增量更新管理 self.post_mapping = {} # 存储帖子ID到序号的映射 self.existing_posts = set() # 已存在的帖子ID集合 # 统计信息 self.stats = { "total_posts": 0, "posts_with_images": 0, "total_images": 0, "downloaded_images": 0, "failed_downloads": 0, "start_time": time.time() } def filter_24h_posts(self, posts): """筛选24小时内的帖子""" now = datetime.now() filtered_posts = [] for post in posts: # 使用post.post_time(datetime对象)或创建相对时间字符串 time_str = self._get_time_string(post) if self._is_within_24h(time_str, now): filtered_posts.append(post) logger.info(f"筛选出24小时内的帖子: {len(filtered_posts)}/{len(filtered_posts)}") return filtered_posts def load_existing_post_mapping(self): """加载已存在的帖子映射关系""" try: # 扫描现有文件,重建帖子映射 for file_path in self.download_dir.glob("*"): if file_path.is_file(): filename = file_path.name # 解析文件名获取帖子序号 if "_image_" in filename: parts = filename.split("_") if len(parts) >= 3: post_number = parts[0] # 这里可以进一步解析帖子ID,暂时使用文件名作为标识 self.post_mapping[filename] = post_number self.existing_posts.add(filename) logger.info(f"加载了 {len(self.post_mapping)} 个现有帖子的映射关系") except Exception as e: logger.warning(f"加载现有帖子映射失败: {e}") def get_post_number_for_update(self, post, total_posts, post_idx): """为增量更新获取帖子序号""" # 只对有图片的帖子分配序号,确保序号连续 # 新帖子获得较大编号,旧帖子获得较小编号 post_number = total_posts - post_idx # 记录映射关系(用于后续增量更新) post_id = getattr(post, 'post_id', None) or f"post_{post_idx}" self.post_mapping[post_id] = str(post_number).zfill(3) return str(post_number).zfill(3) def _get_time_string(self, post): """获取帖子的时间字符串,优先使用相对时间格式""" # 如果post.post_time是datetime对象,计算相对时间 if hasattr(post, 'post_time') and post.post_time: try: now = datetime.now() if post.post_time.tzinfo: # 如果有时区信息,转换为本地时间 now = now.replace(tzinfo=post.post_time.tzinfo) time_diff = now - post.post_time # 计算相对时间 total_seconds = int(time_diff.total_seconds()) if total_seconds < 60: return f"{total_seconds}秒前" elif total_seconds < 3600: minutes = total_seconds // 60 return f"{minutes}分钟前" elif total_seconds < 86400: hours = total_seconds // 3600 return f"{hours}小时前" else: days = total_seconds // 86400 return f"{days}天前" except Exception as e: logger.warning(f"时间计算失败: {e}") # 回退到字符串时间 if hasattr(post, 'post_time_str'): return post.post_time_str # 默认认为是今天的 return "0小时前" def _is_within_24h(self, time_str, now): """判断帖子是否在24小时内""" if not time_str: return False try: # 处理相对时间格式 if "分钟前" in time_str: return True elif "小时前" in time_str: hours = int(time_str.replace("小时前", "")) return hours <= 24 elif "天前" in time_str: days = int(time_str.replace("天前", "")) return days <= 1 elif "昨天" in time_str: return True elif "-" in time_str and len(time_str) <= 6: # MM-DD格式 month, day = map(int, time_str.split("-")) post_date = datetime(now.year, month, day) return (now - post_date).days <= 1 else: # 其他格式暂时认为是今天的 return True except Exception as e: logger.warning(f"时间解析失败: {time_str}, {e}") return False async def download_image(self, session, image_url, filename, post_title="", max_retries=3): """下载单张图片,支持重试机制""" for attempt in range(max_retries): try: # 设置超时和重试 timeout = aiohttp.ClientTimeout(total=30, connect=10) async with session.get(image_url, timeout=timeout) as response: if response.status == 200: content = await response.read() # 验证内容是否为空或过小 if len(content) < 100: # 小于100字节可能是错误页面 logger.warning(f"⚠️ 内容过小: {filename}, 大小: {len(content)} bytes") if attempt < max_retries - 1: await asyncio.sleep(1) # 等待1秒后重试 continue else: logger.error(f"❌ 下载失败: {filename}, 内容过小") self.stats["failed_downloads"] += 1 return False filepath = self.download_dir / filename with open(filepath, 'wb') as f: f.write(content) logger.info(f"✅ 下载成功: {filename} ({len(content)} bytes)") self.stats["downloaded_images"] += 1 return True else: logger.warning(f"❌ 下载失败: {filename}, HTTP {response.status}") if attempt < max_retries - 1: await asyncio.sleep(2) # 等待2秒后重试 continue else: self.stats["failed_downloads"] += 1 return False except asyncio.TimeoutError: logger.warning(f"⏰ 下载超时: {filename}, 尝试 {attempt + 1}/{max_retries}") if attempt < max_retries - 1: await asyncio.sleep(2) continue else: logger.error(f"❌ 下载失败: {filename}, 超时") self.stats["failed_downloads"] += 1 return False except Exception as e: logger.error(f"❌ 下载异常: {filename}, {e}") if attempt < max_retries - 1: await asyncio.sleep(1) continue else: self.stats["failed_downloads"] += 1 return False return False def _is_valid_image_url(self, url): """验证图片URL是否有效""" if not url: return False # 检查URL格式 if not url.startswith(('http://', 'https://')): return False # 检查是否为图片文件 valid_extensions = ['.jpg', '.jpeg', '.png', '.gif', '.webp', '.bmp'] url_lower = url.lower() # 检查文件扩展名 has_valid_ext = any(url_lower.endswith(ext) for ext in valid_extensions) # 检查URL是否包含图片相关关键词 has_image_keywords = any(keyword in url_lower for keyword in ['image', 'img', 'photo', 'pic']) # 检查URL长度(过长的URL可能有问题) if len(url) > 500: return False return has_valid_ext or has_image_keywords def _sort_posts_by_time(self, posts): """按照发帖时间排序帖子(最新在前)""" try: sorted_posts = sorted( posts, key=lambda p: p.post_time if hasattr(p, 'post_time') and p.post_time else datetime.min, reverse=True # 最新在前 ) logger.info(f"帖子排序完成,最新发帖时间: {sorted_posts[0].post_time if sorted_posts and hasattr(sorted_posts[0], 'post_time') and sorted_posts[0].post_time else '未知'}") return sorted_posts except Exception as e: logger.warning(f"帖子排序失败: {e}") return posts def generate_filename(self, image_url, post_index, image_index, post_time=None): """生成文件名""" # 解析文件扩展名 parsed_url = urlparse(image_url) path = parsed_url.path ext = Path(path).suffix or '.jpg' # 生成时间戳 if post_time and hasattr(post_time, 'strftime'): timestamp = post_time.strftime("%Y%m%d_%H%M%S") else: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") # 新的文件名格式: 20250823_205541_image_02.jpg filename = f"{timestamp}_image_{image_index:02d}{ext}" return filename async def download_all_images(self, posts): """下载所有图片""" self.stats["total_posts"] = len(posts) # 初始化智能增量管理器 self.incremental_manager = SmartIncrementalManager(self.download_dir) # 筛选24小时内的帖子 recent_posts = self.filter_24h_posts(posts) if not recent_posts: logger.warning("没有找到24小时内的帖子") return # 收集所有图片URL download_tasks = [] image_count = 0 async with aiohttp.ClientSession() as session: # 统计新帖子数量并记录时间 new_posts_count = 0 new_posts = [] existing_posts_with_images = [] for post in recent_posts: if hasattr(post, 'images') and post.images: if self.incremental_manager.is_duplicate_post(post): logger.info(f"跳过重复帖子: {post.title or '无标题'}") # 这是已经存在的有图片的帖子,需要收集起来 existing_posts_with_images.append(post) continue # 记录帖子的发帖时间到增量管理器 if hasattr(post, 'post_time') and post.post_time: self.incremental_manager.record_post_time(post, post.post_time) new_posts.append(post) new_posts_count += 1 # 简化逻辑:直接处理所有新帖子,不需要序号 posts_with_images = new_posts + existing_posts_with_images self.stats["posts_with_images"] = len(posts_with_images) if new_posts_count > 0: logger.info(f"发现 {new_posts_count} 个新帖子,开始下载...") # 现在为所有帖子生成文件名 for post in posts_with_images: for img_idx, image_url in enumerate(post.images): if image_url and self._is_valid_image_url(image_url): # 确保URL不为空且有效 image_count += 1 filename = self.generate_filename(image_url, None, img_idx + 1, post.post_time) # 创建下载任务 task = self.download_image(session, image_url, filename, post.title or "") download_tasks.append(task) elif image_url: logger.warning(f"⚠️ 跳过无效图片URL: {image_url}") self.stats["failed_downloads"] += 1 self.stats["total_images"] = image_count logger.info(f"开始下载 {image_count} 张图片...") # 并发下载所有图片 if download_tasks: results = await asyncio.gather(*download_tasks, return_exceptions=True) # 统计下载结果 for result in results: if isinstance(result, Exception): logger.error(f"下载任务异常: {result}") self.stats["failed_downloads"] += 1 elif result is False: self.stats["failed_downloads"] += 1 # True 的情况已经在 download_image 中统计了 # 保存重复检测记录 self.incremental_manager.save_mapping() def print_statistics(self): """打印统计信息""" elapsed_time = time.time() - self.stats["start_time"] print("\n" + "="*50) print("📊 下载统计报告") print("="*50) print(f"📝 总帖子数: {self.stats['total_posts']}") print(f"🖼️ 含图帖子数: {self.stats['posts_with_images']}") print(f"🎯 总图片数: {self.stats['total_images']}") print(f"✅ 成功下载: {self.stats['downloaded_images']}") print(f"❌ 下载失败: {self.stats['failed_downloads']}") print(f"⏱️ 耗时: {elapsed_time:.2f} 秒") print(f"📁 保存目录: {self.download_dir.absolute()}") if self.stats["total_images"] > 0: success_rate = (self.stats["downloaded_images"] / self.stats["total_images"]) * 100 print(f"🎉 成功率: {success_rate:.1f}%") print("="*50) # 显示增量管理器状态 if hasattr(self, 'incremental_manager'): self.incremental_manager.print_mapping_summary() async def main(): """主函数""" print("🎀 启动HelloKitty图片下载器...") # 导入并使用enhanced_channel_scraper try: from collector.enhanced_channel_scraper import EnhancedQQChannelScraper from core.config import QQChannelConfig # 初始化配置和scraper config = QQChannelConfig() scraper = EnhancedQQChannelScraper(config) # HelloKitty频道URL channel_url = "https://pd.qq.com/g/5yy11f95s1" print(f"🔍 开始抓取HelloKitty频道: {channel_url}") # 抓取帖子 posts = await scraper.scrape_channel_posts(channel_url) if not posts: print("❌ 没有抓取到任何帖子") return print(f"✅ 成功抓取到 {len(posts)} 个帖子") # 初始化下载器 downloader = CompleteHelloKittyDownloader() # 下载所有图片 await downloader.download_all_images(posts) # 显示统计信息 downloader.print_statistics() # 检查下载目录 downloaded_files = list(downloader.download_dir.glob("*")) if downloaded_files: print(f"\n📂 下载的文件:") for i, file in enumerate(downloaded_files[:10], 1): # 只显示前10个 size_kb = file.stat().st_size / 1024 print(f" {i:2d}. {file.name} ({size_kb:.1f} KB)") if len(downloaded_files) > 10: print(f" ... 还有 {len(downloaded_files) - 10} 个文件") except ImportError as e: print(f"❌ 导入错误: {e}") print("请确保enhanced_channel_scraper.py已正确创建") except Exception as e: logger.error(f"❌ 运行失败: {e}") import traceback traceback.print_exc() if __name__ == "__main__": asyncio.run(main())

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/elleys/MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server