trigger_crawl
Manually initiate a web scraping task to collect trending news from specified platforms, with options to save data locally and include URLs.
Instructions
手动触发一次爬取任务(可选持久化)
Args: platforms: 指定平台ID列表,如 ['zhihu', 'weibo', 'douyin'] - 不指定时:使用 config.yaml 中配置的所有平台 - 支持的平台来自 config/config.yaml 的 platforms 配置 - 每个平台都有对应的name字段(如"知乎"、"微博"),方便AI识别 - 注意:失败的平台会在返回结果的 failed_platforms 字段中列出 save_to_local: 是否保存到本地 output 目录,默认 False include_url: 是否包含URL链接,默认False(节省token)
Returns: JSON格式的任务状态信息,包含: - platforms: 成功爬取的平台列表 - failed_platforms: 失败的平台列表(如有) - total_news: 爬取的新闻总数 - data: 新闻数据
Examples: - 临时爬取: trigger_crawl(platforms=['zhihu']) - 爬取并保存: trigger_crawl(platforms=['weibo'], save_to_local=True) - 使用默认平台: trigger_crawl() # 爬取config.yaml中配置的所有平台
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| platforms | No | ||
| save_to_local | No | ||
| include_url | No |
Output Schema
| Name | Required | Description | Default |
|---|---|---|---|
| result | Yes |
Implementation Reference
- mcp_server/tools/system.py:68-235 (handler)The 'trigger_crawl' method in 'SystemManagementTools' class performs the actual logic for triggering a web crawl based on configured platforms, managing retries, and optionally saving results.
def trigger_crawl(self, platforms: Optional[List[str]] = None, save_to_local: bool = False, include_url: bool = False) -> Dict: """ 手动触发一次临时爬取任务(可选持久化) Args: platforms: 指定平台列表,为空则爬取所有平台 save_to_local: 是否保存到本地 output 目录,默认 False include_url: 是否包含URL链接,默认False(节省token) Returns: 爬取结果字典,包含新闻数据和保存路径(如果保存) Example: >>> tools = SystemManagementTools() >>> # 临时爬取,不保存 >>> result = tools.trigger_crawl(platforms=['zhihu', 'weibo']) >>> print(result['data']) >>> # 爬取并保存到本地 >>> result = tools.trigger_crawl(platforms=['zhihu'], save_to_local=True) >>> print(result['saved_files']) """ try: import json import time import random import requests from datetime import datetime import pytz import yaml # 参数验证 platforms = validate_platforms(platforms) # 加载配置文件 config_path = self.project_root / "config" / "config.yaml" if not config_path.exists(): raise CrawlTaskError( "配置文件不存在", suggestion=f"请确保配置文件存在: {config_path}" ) # 读取配置 with open(config_path, "r", encoding="utf-8") as f: config_data = yaml.safe_load(f) # 获取平台配置 all_platforms = config_data.get("platforms", []) if not all_platforms: raise CrawlTaskError( "配置文件中没有平台配置", suggestion="请检查 config/config.yaml 中的 platforms 配置" ) # 过滤平台 if platforms: target_platforms = [p for p in all_platforms if p["id"] in platforms] if not target_platforms: raise CrawlTaskError( f"指定的平台不存在: {platforms}", suggestion=f"可用平台: {[p['id'] for p in all_platforms]}" ) else: target_platforms = all_platforms # 获取请求间隔 request_interval = config_data.get("crawler", {}).get("request_interval", 100) # 构建平台ID列表 ids = [] for platform in target_platforms: if "name" in platform: ids.append((platform["id"], platform["name"])) else: ids.append(platform["id"]) print(f"开始临时爬取,平台: {[p.get('name', p['id']) for p in target_platforms]}") # 爬取数据 results = {} id_to_name = {} failed_ids = [] for i, id_info in enumerate(ids): if isinstance(id_info, tuple): id_value, name = id_info else: id_value = id_info name = id_value id_to_name[id_value] = name # 构建请求URL url = f"https://newsnow.busiyi.world/api/s?id={id_value}&latest" headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36", "Accept": "application/json, text/plain, */*", "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8", "Connection": "keep-alive", "Cache-Control": "no-cache", } # 重试机制 max_retries = 2 retries = 0 success = False while retries <= max_retries and not success: try: response = requests.get(url, headers=headers, timeout=10) response.raise_for_status() data_text = response.text data_json = json.loads(data_text) status = data_json.get("status", "未知") if status not in ["success", "cache"]: raise ValueError(f"响应状态异常: {status}") status_info = "最新数据" if status == "success" else "缓存数据" print(f"获取 {id_value} 成功({status_info})") # 解析数据 results[id_value] = {} for index, item in enumerate(data_json.get("items", []), 1): title = item["title"] url_link = item.get("url", "") mobile_url = item.get("mobileUrl", "") if title in results[id_value]: results[id_value][title]["ranks"].append(index) else: results[id_value][title] = { "ranks": [index], "url": url_link, "mobileUrl": mobile_url, } success = True except Exception as e: retries += 1 if retries <= max_retries: wait_time = random.uniform(3, 5) print(f"请求 {id_value} 失败: {e}. {wait_time:.2f}秒后重试...") time.sleep(wait_time) else: print(f"请求 {id_value} 失败: {e}") failed_ids.append(id_value) # 请求间隔 if i < len(ids) - 1: actual_interval = request_interval + random.randint(-10, 20) actual_interval = max(50, actual_interval) time.sleep(actual_interval / 1000) # 格式化返回数据 news_data = [] for platform_id, titles_data in results.items(): platform_name = id_to_name.get(platform_id, platform_id) for title, info in titles_data.items(): news_item = { "platform_id": platform_id, "platform_name": platform_name, "title": title, "ranks": info["ranks"] }