Skip to main content
Glama

aggregate_news

Aggregate and deduplicate similar news articles from multiple platforms into consolidated reports, showing cross-platform coverage and combined engagement metrics.

Instructions

跨平台新闻聚合 - 对相似新闻进行去重合并

将不同平台报道的同一事件合并为一条聚合新闻,显示跨平台覆盖情况和综合热度。

Args: date_range: 日期范围,不指定则查询今天 platforms: 平台ID列表,如 ['zhihu', 'weibo'],不指定则使用所有平台 similarity_threshold: 相似度阈值,0.3-1.0,默认0.7(越高越严格) limit: 返回聚合新闻数量,默认50 include_url: 是否包含URL链接,默认False

Returns: JSON格式的聚合结果,包含去重统计、聚合新闻列表和平台覆盖统计

Examples: - aggregate_news() - aggregate_news(similarity_threshold=0.8)

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
date_rangeNo
platformsNo
similarity_thresholdNo
limitNo
include_urlNo

Implementation Reference

  • The actual implementation of aggregate_news in the AnalyticsTools class. This method performs cross-platform news aggregation with deduplication, collecting news data, validating parameters, and aggregating similar news items using similarity thresholds. It returns aggregated results with statistics including deduplication rate and platform coverage.
    def aggregate_news(
        self,
        date_range: Optional[Union[Dict[str, str], str]] = None,
        platforms: Optional[List[str]] = None,
        similarity_threshold: float = 0.7,
        limit: int = 50,
        include_url: bool = False
    ) -> Dict:
        """
        跨平台新闻聚合 - 对相似新闻进行去重合并
    
        将不同平台报道的同一事件合并为一条聚合新闻,
        显示该新闻在各平台的覆盖情况和综合热度。
    
        Args:
            date_range: 日期范围(可选)
                - 不指定: 查询今天
                - {\"start\": \"YYYY-MM-DD\", \"end\": \"YYYY-MM-DD\"}: 日期范围
            platforms: 平台过滤列表,如 ['zhihu', 'weibo']
            similarity_threshold: 相似度阈值,0-1之间,默认0.7
            limit: 返回聚合新闻数量,默认50
            include_url: 是否包含URL链接,默认False
    
        Returns:
            聚合结果字典,包含:
            - aggregated_news: 聚合后的新闻列表
            - statistics: 聚合统计信息
        """
        try:
            # 参数验证
            platforms = validate_platforms(platforms)
            similarity_threshold = validate_threshold(
                similarity_threshold, default=0.7, min_value=0.3, max_value=1.0
            )
            limit = validate_limit(limit, default=50)
    
            # 处理日期范围
            if date_range:
                date_range_tuple = validate_date_range(date_range)
                start_date, end_date = date_range_tuple
            else:
                start_date = end_date = datetime.now()
    
            # 收集所有新闻
            all_news = []
            current_date = start_date
    
            while current_date <= end_date:
                try:
                    all_titles, id_to_name, _ = self.data_service.parser.read_all_titles_for_date(
                        date=current_date,
                        platform_ids=platforms
                    )
    
                    for platform_id, titles in all_titles.items():
                        platform_name = id_to_name.get(platform_id, platform_id)
    
                        for title, info in titles.items():
                            news_item = {
                                "title": title,
                                "platform": platform_id,
                                "platform_name": platform_name,
                                "date": current_date.strftime("%Y-%m-%d"),
                                "ranks": info.get("ranks", []),
                                "count": len(info.get("ranks", [])),
                                "rank": info["ranks"][0] if info["ranks"] else 999
                            }
    
                            if include_url:
                                news_item["url"] = info.get("url", "")
                                news_item["mobileUrl"] = info.get("mobileUrl", "")
    
                            # 计算权重
                            news_item["weight"] = calculate_news_weight(news_item)
                            all_news.append(news_item)
    
                except DataNotFoundError:
                    pass
    
                current_date += timedelta(days=1)
    
            if not all_news:
                return {
                    "success": True,
                    "summary": {
                        "description": "跨平台新闻聚合结果",
                        "total": 0,
                        "returned": 0
                    },
                    "data": [],
                    "message": "未找到新闻数据"
                }
    
            # 执行聚合
            aggregated = self._aggregate_similar_news(
                all_news, similarity_threshold, include_url
            )
    
            # 按综合权重排序
            aggregated.sort(key=lambda x: x["aggregate_weight"], reverse=True)
    
            # 限制返回数量
            results = aggregated[:limit]
    
            # 统计信息
            total_original = len(all_news)
            total_aggregated = len(aggregated)
            dedup_rate = 1 - (total_aggregated / total_original) if total_original > 0 else 0
    
            platform_coverage = Counter()
            for item in aggregated:
                for p in item["platforms"]:
                    platform_coverage[p] += 1
    
            return {
                "success": True,
                "summary": {
                    "description": "跨平台新闻聚合结果",
                    "original_count": total_original,
                    "aggregated_count": total_aggregated,
                    "returned": len(results),
                    "deduplication_rate": f"{dedup_rate * 100:.1f}%",
                    "similarity_threshold": similarity_threshold,
                    "date_range": {
                        "start": start_date.strftime("%Y-%m-%d"),
                        "end": end_date.strftime("%Y-%m-%d")
                    }
                },
                "data": results,
                "statistics": {
                    "platform_coverage": dict(platform_coverage),
                    "multi_platform_news": len([a for a in aggregated if len(a["platforms"]) > 1]),
                    "single_platform_news": len([a for a in aggregated if len(a["platforms"]) == 1])
                }
            }
    
        except MCPError as e:
            return {"success": False, "error": e.to_dict()}
        except Exception as e:
            return {"success": False, "error": {"code": "INTERNAL_ERROR", "message": str(e)}}
  • MCP tool registration and handler wrapper for aggregate_news. Decorated with @mcp.tool, this async function serves as the MCP endpoint, validates parameters, gets the analytics tools instance, and delegates to the actual implementation in a separate thread using asyncio.to_thread.
    @mcp.tool
    async def aggregate_news(
        date_range: Optional[Union[Dict[str, str], str]] = None,
        platforms: Optional[List[str]] = None,
        similarity_threshold: float = 0.7,
        limit: int = 50,
        include_url: bool = False
    ) -> str:
        """
        跨平台新闻聚合 - 对相似新闻进行去重合并
    
        将不同平台报道的同一事件合并为一条聚合新闻,显示跨平台覆盖情况和综合热度。
    
        Args:
            date_range: 日期范围,不指定则查询今天
            platforms: 平台ID列表,如 ['zhihu', 'weibo'],不指定则使用所有平台
            similarity_threshold: 相似度阈值,0.3-1.0,默认0.7(越高越严格)
            limit: 返回聚合新闻数量,默认50
            include_url: 是否包含URL链接,默认False
    
        Returns:
            JSON格式的聚合结果,包含去重统计、聚合新闻列表和平台覆盖统计
    
        Examples:
            - aggregate_news()
            - aggregate_news(similarity_threshold=0.8)
        """
        tools = _get_tools()
        result = await asyncio.to_thread(
            tools['analytics'].aggregate_news,
            date_range=date_range,
            platforms=platforms,
            similarity_threshold=similarity_threshold,
            limit=limit,
            include_url=include_url
        )
        return json.dumps(result, ensure_ascii=False, indent=2)
  • Helper method _aggregate_similar_news that performs the core aggregation logic. Uses a two-layer filtering strategy: Jaccard similarity for fast pre-filtering and SequenceMatcher for precise similarity calculation. Groups similar news items from different platforms and calculates aggregate weights.
    def _aggregate_similar_news(
        self,
        news_list: List[Dict],
        threshold: float,
        include_url: bool
    ) -> List[Dict]:
        """
        对新闻列表进行相似度聚合
    
        使用双层过滤策略:先用 Jaccard 快速粗筛,再用 SequenceMatcher 精确计算
    
        Args:
            news_list: 新闻列表
            threshold: 相似度阈值
            include_url: 是否包含URL
    
        Returns:
            聚合后的新闻列表
        """
        if not news_list:
            return []
    
        # 预计算字符集合用于快速过滤
        prepared_news = []
        for news in news_list:
            char_set = set(news["title"])
            prepared_news.append({
                "data": news,
                "char_set": char_set,
                "set_len": len(char_set)
            })
    
        # 按权重排序
        sorted_items = sorted(prepared_news, key=lambda x: x["data"].get("weight", 0), reverse=True)
    
        aggregated = []
        used_indices = set()
        PRE_FILTER_RATIO = 0.5  # 粗筛阈值系数
    
        for i, item in enumerate(sorted_items):
            if i in used_indices:
                continue
    
            news = item["data"]
            base_set = item["char_set"]
            base_len = item["set_len"]
    
            group = {
                "representative_title": news["title"],
                "platforms": [news["platform_name"]],
                "platform_ids": [news["platform"]],
                "dates": [news["date"]],
                "best_rank": news["rank"],
                "total_count": news["count"],
                "aggregate_weight": news.get("weight", 0),
                "sources": [{
                    "platform": news["platform_name"],
                    "rank": news["rank"],
                    "date": news["date"]
                }]
            }
    
            if include_url and news.get("url"):
                group["urls"] = [{
                    "platform": news["platform_name"],
                    "url": news.get("url", ""),
                    "mobileUrl": news.get("mobileUrl", "")
                }]
    
            used_indices.add(i)
    
            # 查找相似新闻
            for j in range(i + 1, len(sorted_items)):
                if j in used_indices:
                    continue
    
                compare_item = sorted_items[j]
                compare_set = compare_item["char_set"]
                compare_len = compare_item["set_len"]
    
                # 快速粗筛:长度检查
                if base_len == 0 or compare_len == 0:
                    continue
    
                # 快速粗筛:长度比例检查
                if min(base_len, compare_len) / max(base_len, compare_len) < (threshold * PRE_FILTER_RATIO):
                    continue
    
                # 快速粗筛:Jaccard 相似度
                intersection = len(base_set & compare_set)
                union = len(base_set | compare_set)
                jaccard_sim = intersection / union if union > 0 else 0
    
                if jaccard_sim < (threshold * PRE_FILTER_RATIO):
                    continue
    
                # 精确计算:SequenceMatcher
                other_news = compare_item["data"]
                real_similarity = self._calculate_similarity(news["title"], other_news["title"])
    
                if real_similarity >= threshold:
                    # 合并到当前组
                    if other_news["platform_name"] not in group["platforms"]:
                        group["platforms"].append(other_news["platform_name"])
                        group["platform_ids"].append(other_news["platform"])
    
                    if other_news["date"] not in group["dates"]:
                        group["dates"].append(other_news["date"])
    
                    group["best_rank"] = min(group["best_rank"], other_news["rank"])
                    group["total_count"] += other_news["count"]
                    group["aggregate_weight"] += other_news.get("weight", 0) * 0.5  # 额外权重
    
                    group["sources"].append({
                        "platform": other_news["platform_name"],
                        "rank": other_news["rank"],
                        "date": other_news["date"]
                    })
    
                    if include_url and other_news.get("url"):
                        if "urls" not in group:
                            group["urls"] = []
                        group["urls"].append({
                            "platform": other_news["platform_name"],
                            "url": other_news.get("url", ""),
                            "mobileUrl": other_news.get("mobileUrl", "")
                        })
    
                    used_indices.add(j)
    
            # 添加聚合信息
            group["platform_count"] = len(group["platforms"])
            group["is_cross_platform"] = len(group["platforms"]) > 1
    
            aggregated.append(group)
    
        return aggregated
  • Schema validation for platforms parameter. The validate_platforms function accepts None (default platforms), list, or string input, validates against config.yaml supported platforms, and handles various input formats including JSON strings and comma-separated values.
    def validate_platforms(platforms: Optional[Union[List[str], str]]) -> List[str]:
        """
        验证平台列表
    
        Args:
            platforms: 平台ID列表或字符串,None表示使用 config.yaml 中配置的所有平台
                       支持多种格式:
                       - None: 使用默认平台
                       - ["zhihu", "weibo"]: JSON 数组
                       - '["zhihu", "weibo"]': JSON 数组字符串
                       - "['zhihu', 'weibo']": Python 列表字符串
                       - "zhihu, weibo": 逗号分隔字符串
                       - "zhihu": 单个平台字符串
    
        Returns:
            验证后的平台列表
    
        Raises:
            InvalidParameterError: 平台不支持
    
        Note:
            - platforms=None 时,返回 config.yaml 中配置的平台列表
            - 会验证平台ID是否在 config.yaml 的 platforms 配置中
            - 配置加载失败时,允许所有平台通过(降级策略)
        """
        supported_platforms = get_supported_platforms()
    
        if platforms is None:
            # 返回配置文件中的平台列表(用户的默认配置)
            return supported_platforms if supported_platforms else []
    
        # 支持字符串形式的列表输入(某些 MCP 客户端会将 JSON 数组序列化为字符串)
        if isinstance(platforms, str):
            platforms = _parse_string_to_list(platforms)
            if not platforms:
                # 空字符串或解析后为空,使用默认平台
                return supported_platforms if supported_platforms else []
    
        if not isinstance(platforms, list):
            raise InvalidParameterError("platforms 参数必须是列表类型")
    
        if not platforms:
            # 空列表时,返回配置文件中的平台列表
            return supported_platforms if supported_platforms else []
    
        # 如果配置加载失败(supported_platforms为空),允许所有平台通过
        if not supported_platforms:
            print("警告:平台配置未加载,跳过平台验证")
            return platforms
    
        # 验证每个平台是否在配置中
        invalid_platforms = [p for p in platforms if p not in supported_platforms]
        if invalid_platforms:
            raise InvalidParameterError(
                f"不支持的平台: {', '.join(invalid_platforms)}",
                suggestion=f"支持的平台(来自config.yaml): {', '.join(supported_platforms)}"
            )
    
        return platforms
  • Schema validation for similarity_threshold parameter. The validate_threshold function validates the threshold value is within the specified range (default 0.3-1.0), handles string/int/float inputs, and provides meaningful error messages.
    def validate_threshold(
        threshold: Optional[Union[float, int, str]],
        default: float = 0.6,
        min_value: float = 0.0,
        max_value: float = 1.0,
        param_name: str = "threshold"
    ) -> float:
        """
        验证阈值参数(浮点数)
    
        Args:
            threshold: 阈值(浮点数、整数或字符串)
            default: 默认值
            min_value: 最小值
            max_value: 最大值
            param_name: 参数名(用于错误消息)
    
        Returns:
            验证后的阈值
    
        Raises:
            InvalidParameterError: 参数无效
        """
        if threshold is None:
            return default
    
        # 支持字符串形式的数字(某些 MCP 客户端会将数字序列化为字符串)
        if isinstance(threshold, str):
            threshold = _parse_string_to_float(threshold, param_name)
    
        # 整数转浮点数
        if isinstance(threshold, int):
            threshold = float(threshold)
    
        if not isinstance(threshold, float):
            raise InvalidParameterError(
                f"{param_name} 必须是数字类型",
                suggestion=f"请提供 {min_value} 到 {max_value} 之间的数字"
            )
    
        if threshold < min_value or threshold > max_value:
            raise InvalidParameterError(
                f"{param_name} 必须在 {min_value} 到 {max_value} 之间,当前值: {threshold}",
                suggestion=f"推荐值: {default}"
            )
    
        return threshold
    
    
    def validate_date_query(

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/LeePresident/TrendRadar'

If you have feedback or need assistance with the MCP directory API, please join our Discord server