analyze_robot_data
Analyzes robot operational data to identify statistical patterns, trends, anomalies, and performance metrics for specific time periods.
Instructions
分析机器人数据
参数:
- robot_id: 机器人ID
- analysis_type: 分析类型,可选值包括"statistical", "trend", "anomaly", "performance"
- start_time: 开始时间戳
- end_time: 结束时间戳
返回:
- 分析结果
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| robot_id | Yes | ||
| analysis_type | Yes | ||
| start_time | No | ||
| end_time | No |
Implementation Reference
- The primary MCP tool handler for 'analyze_robot_data'. Registers the tool via @mcp.tool() decorator, validates parameters, loads robot data, maps analysis_type to AnalysisType enum, and delegates core analysis to AdvancedDataAnalyzer. Handles errors and formats response.@mcp.tool() def analyze_robot_data(robot_id: str, analysis_type: str, start_time: float = None, end_time: float = None): """ 分析机器人数据 参数: - robot_id: 机器人ID - analysis_type: 分析类型,可选值包括"statistical", "trend", "anomaly", "performance" - start_time: 开始时间戳 - end_time: 结束时间戳 返回: - 分析结果 """ try: if advanced_data_analyzer is None: return return_msg("高级数据分析器未初始化") # 映射分析类型字符串到枚举值 type_map = { "statistical": AnalysisType.STATISTICAL, "trend": AnalysisType.TREND, "anomaly": AnalysisType.ANOMALY, "performance": AnalysisType.PERFORMANCE } if analysis_type not in type_map: return return_msg(f"不支持的分析类型: {analysis_type}") # 加载数据 df = advanced_data_analyzer.load_data( robot_id=robot_id, start_time=start_time, end_time=end_time ) if df.empty: return return_msg({"error": "未找到数据"}) # 执行分析 analysis_params = {} if analysis_type == "trend" and 'timestamp' in df.columns: # 对于趋势分析,使用时间戳作为x轴 numeric_columns = df.select_dtypes(include=['float64', 'int64']).columns.tolist() if numeric_columns and numeric_columns[0] != 'timestamp': analysis_params = {'x_column': 'timestamp', 'y_column': numeric_columns[0]} result = advanced_data_analyzer.analyze( df, type_map[analysis_type], analysis_params ) return return_msg({"analysis_type": analysis_type, "result": result}) except Exception as e: logger.error(f"分析机器人数据失败: {str(e)}") return return_msg(f"分析机器人数据失败: {str(e)}")
- Supporting helper method in AdvancedDataAnalyzer class that implements the core analysis dispatching logic for different analysis types (statistical, trend, anomaly, performance, etc.), called directly by the tool handler.def analyze(self, df: pd.DataFrame, analysis_type: AnalysisType, config: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: """ 执行分析 Args: df: 数据 analysis_type: 分析类型 config: 分析配置 Returns: Dict[str, Any]: 分析结果 """ config = config or {} if analysis_type == AnalysisType.STATISTICAL: return self.statistical_analyzer.analyze( df, columns=config.get('columns'), group_by=config.get('group_by') ) elif analysis_type == AnalysisType.TREND: trend_config = TrendConfig( method=config.get('method', TrendAnalysisMethod.LINEAR_REGRESSION), window_size=config.get('window_size', 10), degree=config.get('degree', 2) ) return self.trend_analyzer.analyze( df, x_column=config.get('x_column', 'timestamp'), y_column=config.get('y_column'), config=trend_config ) elif analysis_type == AnalysisType.ANOMALY: anomaly_config = AnomalyConfig( method=config.get('method', AnomalyDetectionMethod.Z_SCORE), threshold=config.get('threshold', 3.0) ) result_df = self.anomaly_detector.detect( df, columns=config.get('columns', []), config=anomaly_config ) # 返回异常统计和标记的数据 return { 'anomaly_summary': self.anomaly_detector.get_anomaly_summary(result_df), 'data_with_anomalies': result_df.to_dict('records') } elif analysis_type == AnalysisType.PERFORMANCE: return self.performance_analyzer.analyze_robot_performance( df, operation_column=config.get('operation_column', 'operation_type'), duration_column=config.get('duration_column', 'execution_time') ) elif analysis_type == AnalysisType.ENERGY: return self.energy_analyzer.analyze_energy_consumption( df, voltage_column=config.get('voltage_column', 'voltage'), current_column=config.get('current_column', 'current') ) elif analysis_type == AnalysisType.TRAJECTORY: return self.trajectory_analyzer.analyze_trajectory( df, x_column=config.get('x_column', 'x'), y_column=config.get('y_column', 'y'), z_column=config.get('z_column') ) elif analysis_type == AnalysisType.CORRELATION: # 计算相关性矩阵 numeric_df = df.select_dtypes(include=['float64', 'int64']) return { 'correlation_matrix': numeric_df.corr().to_dict(), 'strongest_correlations': self._find_strongest_correlations(numeric_df) } elif analysis_type == AnalysisType.CLUSTERING: return self._perform_clustering( df, columns=config.get('columns', []), n_clusters=config.get('n_clusters', 3) ) else: return {'error': f'不支持的分析类型: {analysis_type}'}
- Enum defining the supported analysis types used by the tool (statistical, trend, anomaly, performance), mapped in the handler.class AnalysisType(Enum): """分析类型枚举""" STATISTICAL = "statistical" # 统计分析 TREND = "trend" # 趋势分析 ANOMALY = "anomaly" # 异常检测 PERFORMANCE = "performance" # 性能分析 ENERGY = "energy" # 能耗分析 TRAJECTORY = "trajectory" # 轨迹分析 CORRELATION = "correlation" # 相关性分析 CLUSTERING = "clustering" # 聚类分析 CUSTOM = "custom" # 自定义分析