Skip to main content
Glama

analyze_data_insights

Analyze platform comparisons, activity statistics, and keyword co-occurrence patterns to extract actionable insights from aggregated trend data.

Instructions

统一数据洞察分析工具 - 整合多种数据分析模式

Args: insight_type: 洞察类型,可选值: - "platform_compare": 平台对比分析(对比不同平台对话题的关注度) - "platform_activity": 平台活跃度统计(统计各平台发布频率和活跃时间) - "keyword_cooccur": 关键词共现分析(分析关键词同时出现的模式) topic: 话题关键词(可选,platform_compare模式适用) date_range: 【对象类型】 日期范围(可选) - 格式: {"start": "YYYY-MM-DD", "end": "YYYY-MM-DD"} - 示例: {"start": "2025-01-01", "end": "2025-01-07"} - 重要: 必须是对象格式,不能传递整数 min_frequency: 最小共现频次(keyword_cooccur模式),默认3 top_n: 返回TOP N结果(keyword_cooccur模式),默认20

Returns: JSON格式的数据洞察分析结果

Examples: - analyze_data_insights(insight_type="platform_compare", topic="人工智能") - analyze_data_insights(insight_type="platform_activity", date_range={"start": "2025-01-01", "end": "2025-01-07"}) - analyze_data_insights(insight_type="keyword_cooccur", min_frequency=5, top_n=15)

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
insight_typeNoplatform_compare
topicNo
date_rangeNo
min_frequencyNo
top_nNo

Implementation Reference

  • Main implementation of analyze_data_insights_unified - the core handler that routes to different analysis types (platform_compare, platform_activity, keyword_cooccur) based on the insight_type parameter
    def analyze_data_insights_unified(
        self,
        insight_type: str = "platform_compare",
        topic: Optional[str] = None,
        date_range: Optional[Union[Dict[str, str], str]] = None,
        min_frequency: int = 3,
        top_n: int = 20
    ) -> Dict:
        """
        统一数据洞察分析工具 - 整合多种数据分析模式
    
        Args:
            insight_type: 洞察类型,可选值:
                - "platform_compare": 平台对比分析(对比不同平台对话题的关注度)
                - "platform_activity": 平台活跃度统计(统计各平台发布频率和活跃时间)
                - "keyword_cooccur": 关键词共现分析(分析关键词同时出现的模式)
            topic: 话题关键词(可选,platform_compare模式适用)
            date_range: 日期范围,格式: {"start": "YYYY-MM-DD", "end": "YYYY-MM-DD"}
            min_frequency: 最小共现频次(keyword_cooccur模式),默认3
            top_n: 返回TOP N结果(keyword_cooccur模式),默认20
    
        Returns:
            数据洞察分析结果字典
    
        Examples:
            - analyze_data_insights_unified(insight_type="platform_compare", topic="人工智能")
            - analyze_data_insights_unified(insight_type="platform_activity", date_range={...})
            - analyze_data_insights_unified(insight_type="keyword_cooccur", min_frequency=5)
        """
        try:
            # 参数验证
            if insight_type not in ["platform_compare", "platform_activity", "keyword_cooccur"]:
                raise InvalidParameterError(
                    f"无效的洞察类型: {insight_type}",
                    suggestion="支持的类型: platform_compare, platform_activity, keyword_cooccur"
                )
    
            # 根据洞察类型调用相应方法
            if insight_type == "platform_compare":
                return self.compare_platforms(
                    topic=topic,
                    date_range=date_range
                )
            elif insight_type == "platform_activity":
                return self.get_platform_activity_stats(
                    date_range=date_range
                )
            else:  # keyword_cooccur
                return self.analyze_keyword_cooccurrence(
                    min_frequency=min_frequency,
                    top_n=top_n
                )
    
        except MCPError as e:
            return {
                "success": False,
                "error": e.to_dict()
            }
        except Exception as e:
            return {
                "success": False,
                "error": {
                    "code": "INTERNAL_ERROR",
                    "message": str(e)
                }
            }
  • MCP tool registration - the @mcp.tool decorated async function analyze_data_insights that wraps the AnalyticsTools.analyze_data_insights_unified method for MCP protocol exposure
    @mcp.tool
    async def analyze_data_insights(
        insight_type: str = "platform_compare",
        topic: Optional[str] = None,
        date_range: Optional[Union[Dict[str, str], str]] = None,
        min_frequency: int = 3,
        top_n: int = 20
    ) -> str:
        """
        统一数据洞察分析工具 - 整合多种数据分析模式
    
        Args:
            insight_type: 洞察类型,可选值:
                - "platform_compare": 平台对比分析(对比不同平台对话题的关注度)
                - "platform_activity": 平台活跃度统计(统计各平台发布频率和活跃时间)
                - "keyword_cooccur": 关键词共现分析(分析关键词同时出现的模式)
            topic: 话题关键词(可选,platform_compare模式适用)
            date_range: **【对象类型】** 日期范围(可选)
                        - **格式**: {"start": "YYYY-MM-DD", "end": "YYYY-MM-DD"}
                        - **示例**: {"start": "2025-01-01", "end": "2025-01-07"}
                        - **重要**: 必须是对象格式,不能传递整数
            min_frequency: 最小共现频次(keyword_cooccur模式),默认3
            top_n: 返回TOP N结果(keyword_cooccur模式),默认20
    
        Returns:
            JSON格式的数据洞察分析结果
    
        Examples:
            - analyze_data_insights(insight_type="platform_compare", topic="人工智能")
            - analyze_data_insights(insight_type="platform_activity", date_range={"start": "2025-01-01", "end": "2025-01-07"})
            - analyze_data_insights(insight_type="keyword_cooccur", min_frequency=5, top_n=15)
        """
        tools = _get_tools()
        result = await asyncio.to_thread(
            tools['analytics'].analyze_data_insights_unified,
            insight_type=insight_type,
            topic=topic,
            date_range=date_range,
            min_frequency=min_frequency,
            top_n=top_n
        )
        return json.dumps(result, ensure_ascii=False, indent=2)
  • Supporting handler - compare_platforms method for platform comparison analysis (called when insight_type='platform_compare')
    def compare_platforms(
        self,
        topic: Optional[str] = None,
        date_range: Optional[Union[Dict[str, str], str]] = None
    ) -> Dict:
        """
        平台对比分析 - 对比不同平台对同一话题的关注度
    
        Args:
            topic: 话题关键词(可选,不指定则对比整体活跃度)
            date_range: 日期范围,格式: {"start": "YYYY-MM-DD", "end": "YYYY-MM-DD"}
    
        Returns:
            平台对比分析结果
    
        Examples:
            用户询问示例:
            - "对比一下各个平台对'人工智能'话题的关注度"
            - "看看知乎和微博哪个平台更关注科技新闻"
            - "分析各平台今天的热点分布"
    
            代码调用示例:
            >>> # 对比各平台(假设今天是 2025-11-17)
            >>> result = tools.compare_platforms(
            ...     topic="人工智能",
            ...     date_range={"start": "2025-11-08", "end": "2025-11-17"}
            ... )
            >>> print(result['platform_stats'])
        """
        try:
            # 参数验证
            if topic:
                topic = validate_keyword(topic)
            date_range_tuple = validate_date_range(date_range)
    
            # 确定日期范围
            if date_range_tuple:
                start_date, end_date = date_range_tuple
            else:
                start_date = end_date = datetime.now()
    
            # 收集各平台数据
            platform_stats = defaultdict(lambda: {
                "total_news": 0,
                "topic_mentions": 0,
                "unique_titles": set(),
                "top_keywords": Counter()
            })
    
            # 遍历日期范围
            current_date = start_date
            while current_date <= end_date:
                try:
                    all_titles, id_to_name, _ = self.data_service.parser.read_all_titles_for_date(
                        date=current_date
                    )
    
                    for platform_id, titles in all_titles.items():
                        platform_name = id_to_name.get(platform_id, platform_id)
    
                        for title in titles.keys():
                            platform_stats[platform_name]["total_news"] += 1
                            platform_stats[platform_name]["unique_titles"].add(title)
    
                            # 如果指定了话题,统计包含话题的新闻
                            if topic and topic.lower() in title.lower():
                                platform_stats[platform_name]["topic_mentions"] += 1
    
                            # 提取关键词(简单分词)
                            keywords = self._extract_keywords(title)
                            platform_stats[platform_name]["top_keywords"].update(keywords)
    
                except DataNotFoundError:
                    pass
    
                current_date += timedelta(days=1)
    
            # 转换为可序列化的格式
            result_stats = {}
            for platform, stats in platform_stats.items():
                coverage_rate = 0
                if stats["total_news"] > 0:
                    coverage_rate = (stats["topic_mentions"] / stats["total_news"]) * 100
    
                result_stats[platform] = {
                    "total_news": stats["total_news"],
                    "topic_mentions": stats["topic_mentions"],
                    "unique_titles": len(stats["unique_titles"]),
                    "coverage_rate": round(coverage_rate, 2),
                    "top_keywords": [
                        {"keyword": k, "count": v}
                        for k, v in stats["top_keywords"].most_common(5)
                    ]
                }
    
            # 找出各平台独有的热点
            unique_topics = self._find_unique_topics(platform_stats)
    
            return {
                "success": True,
                "topic": topic,
                "date_range": {
                    "start": start_date.strftime("%Y-%m-%d"),
                    "end": end_date.strftime("%Y-%m-%d")
                },
                "platform_stats": result_stats,
                "unique_topics": unique_topics,
                "total_platforms": len(result_stats)
            }
    
        except MCPError as e:
            return {
                "success": False,
                "error": e.to_dict()
            }
        except Exception as e:
            return {
                "success": False,
                "error": {
                    "code": "INTERNAL_ERROR",
                    "message": str(e)
                }
            }
  • Supporting handler - get_platform_activity_stats method for platform activity statistics (called when insight_type='platform_activity')
    def get_platform_activity_stats(
        self,
        date_range: Optional[Union[Dict[str, str], str]] = None
    ) -> Dict:
        """
        平台活跃度统计 - 统计各平台的发布频率和活跃时间段
    
        Args:
            date_range: 日期范围(可选)
    
        Returns:
            平台活跃度统计结果
    
        Examples:
            用户询问示例:
            - "统计各平台今天的活跃度"
            - "看看哪个平台更新最频繁"
            - "分析各平台的发布时间规律"
    
            代码调用示例:
            >>> # 查看各平台活跃度(假设今天是 2025-11-17)
            >>> result = tools.get_platform_activity_stats(
            ...     date_range={"start": "2025-11-08", "end": "2025-11-17"}
            ... )
            >>> print(result['platform_activity'])
        """
        try:
            # 参数验证
            date_range_tuple = validate_date_range(date_range)
    
            # 确定日期范围
            if date_range_tuple:
                start_date, end_date = date_range_tuple
            else:
                start_date = end_date = datetime.now()
    
            # 统计各平台活跃度
            platform_activity = defaultdict(lambda: {
                "total_updates": 0,
                "days_active": set(),
                "news_count": 0,
                "hourly_distribution": Counter()
            })
    
            # 遍历日期范围
            current_date = start_date
            while current_date <= end_date:
                try:
                    all_titles, id_to_name, timestamps = self.data_service.parser.read_all_titles_for_date(
                        date=current_date
                    )
    
                    for platform_id, titles in all_titles.items():
                        platform_name = id_to_name.get(platform_id, platform_id)
    
                        platform_activity[platform_name]["news_count"] += len(titles)
                        platform_activity[platform_name]["days_active"].add(current_date.strftime("%Y-%m-%d"))
    
                        # 统计更新次数(基于文件数量)
                        platform_activity[platform_name]["total_updates"] += len(timestamps)
    
                        # 统计时间分布(基于文件名中的时间)
                        for filename in timestamps.keys():
                            # 解析文件名中的小时(格式:HHMM.txt)
                            match = re.match(r'(\d{2})(\d{2})\.txt', filename)
                            if match:
                                hour = int(match.group(1))
                                platform_activity[platform_name]["hourly_distribution"][hour] += 1
    
                except DataNotFoundError:
                    pass
    
                current_date += timedelta(days=1)
    
            # 转换为可序列化的格式
            result_activity = {}
            for platform, stats in platform_activity.items():
                days_count = len(stats["days_active"])
                avg_news_per_day = stats["news_count"] / days_count if days_count > 0 else 0
    
                # 找出最活跃的时间段
                most_active_hours = stats["hourly_distribution"].most_common(3)
    
                result_activity[platform] = {
                    "total_updates": stats["total_updates"],
                    "news_count": stats["news_count"],
                    "days_active": days_count,
                    "avg_news_per_day": round(avg_news_per_day, 2),
                    "most_active_hours": [
                        {"hour": f"{hour:02d}:00", "count": count}
                        for hour, count in most_active_hours
                    ],
                    "activity_score": round(stats["news_count"] / max(days_count, 1), 2)
                }
    
            # 按活跃度排序
            sorted_platforms = sorted(
                result_activity.items(),
                key=lambda x: x[1]["activity_score"],
                reverse=True
            )
    
            return {
                "success": True,
                "date_range": {
                    "start": start_date.strftime("%Y-%m-%d"),
                    "end": end_date.strftime("%Y-%m-%d")
                },
                "platform_activity": dict(sorted_platforms),
                "most_active_platform": sorted_platforms[0][0] if sorted_platforms else None,
                "total_platforms": len(result_activity)
            }
    
        except MCPError as e:
            return {
                "success": False,
                "error": e.to_dict()
            }
        except Exception as e:
            return {
                "success": False,
                "error": {
                    "code": "INTERNAL_ERROR",
                    "message": str(e)
                }
            }
  • Supporting handler - analyze_keyword_cooccurrence method for keyword co-occurrence analysis (called when insight_type='keyword_cooccur')
    def analyze_keyword_cooccurrence(
        self,
        min_frequency: int = 3,
        top_n: int = 20
    ) -> Dict:
        """
        关键词共现分析 - 分析哪些关键词经常同时出现
    
        Args:
            min_frequency: 最小共现频次
            top_n: 返回TOP N关键词对
    
        Returns:
            关键词共现分析结果
    
        Examples:
            用户询问示例:
            - "分析一下哪些关键词经常一起出现"
            - "看看'人工智能'经常和哪些词一起出现"
            - "找出今天新闻中的关键词关联"
    
            代码调用示例:
            >>> tools = AnalyticsTools()
            >>> result = tools.analyze_keyword_cooccurrence(
            ...     min_frequency=5,
            ...     top_n=15
            ... )
            >>> print(result['cooccurrence_pairs'])
        """
        try:
            # 参数验证
            min_frequency = validate_limit(min_frequency, default=3, max_limit=100)
            top_n = validate_top_n(top_n, default=20)
    
            # 读取今天的数据
            all_titles, _, _ = self.data_service.parser.read_all_titles_for_date()
    
            # 关键词共现统计
            cooccurrence = Counter()
            keyword_titles = defaultdict(list)
    
            for platform_id, titles in all_titles.items():
                for title in titles.keys():
                    # 提取关键词
                    keywords = self._extract_keywords(title)
    
                    # 记录每个关键词出现的标题
                    for kw in keywords:
                        keyword_titles[kw].append(title)
    
                    # 计算两两共现
                    if len(keywords) >= 2:
                        for i, kw1 in enumerate(keywords):
                            for kw2 in keywords[i+1:]:
                                # 统一排序,避免重复
                                pair = tuple(sorted([kw1, kw2]))
                                cooccurrence[pair] += 1
    
            # 过滤低频共现
            filtered_pairs = [
                (pair, count) for pair, count in cooccurrence.items()
                if count >= min_frequency
            ]
    
            # 排序并取TOP N
            top_pairs = sorted(filtered_pairs, key=lambda x: x[1], reverse=True)[:top_n]
    
            # 构建结果
            result_pairs = []
            for (kw1, kw2), count in top_pairs:
                # 找出同时包含两个关键词的标题样本
                titles_with_both = [
                    title for title in keyword_titles[kw1]
                    if kw2 in self._extract_keywords(title)
                ]
    
                result_pairs.append({
                    "keyword1": kw1,
                    "keyword2": kw2,
                    "cooccurrence_count": count,
                    "sample_titles": titles_with_both[:3]
                })
    
            return {
                "success": True,
                "summary": {
                    "description": "关键词共现分析结果",
                    "total": len(result_pairs),
                    "min_frequency": min_frequency,
                    "generated_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
                },
                "data": result_pairs
            }
    
        except MCPError as e:
            return {
                "success": False,
                "error": e.to_dict()
            }
        except Exception as e:
            return {
                "success": False,
                "error": {
                    "code": "INTERNAL_ERROR",
                    "message": str(e)
                }
            }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/LeePresident/TrendRadar'

If you have feedback or need assistance with the MCP directory API, please join our Discord server