Skip to main content
Glama

Data Analysis MCP Server

by boyzhang666
two_correlation_calculate.py7.6 kB
import logging import numpy as np from enum import Enum from typing import Optional from pydantic import BaseModel, Field from fastapi import HTTPException, APIRouter from config.config import * from routers.utils.openplant import OpenPlant # 全局配置 router = APIRouter() logger = logging.getLogger("two_correlation_calculate") opt = OpenPlant(host=config_host, port=config_port, timeout=config_timeout) class CorrelationMethod(str, Enum): """相关性计算方法枚举,避免字符串输入错误""" PEARSON = "pearson" SPEARMAN = "spearman" KENDALL = "kendall" class CorrelationRequest(BaseModel): """相关性计算请求模型 用于计算两个数据点之间的相关性强度。 典型应用场景:分析温度与压力的关联性、流量与压力的相关性等。 """ point1: str = Field(..., description="第一个数据点名称, 例如 'W3.NODE1.XX'") point2: str = Field(..., description="第二个数据点名称, 例如 'W3.NODE1.XX'") start_time: str = Field(..., description="开始时间 YYYY-MM-DD HH:MM:SS") end_time: str = Field(..., description="结束时间 YYYY-MM-DD HH:MM:SS") interval: str = Field(..., description="采样间隔,如'1m'") fill_method: str = Field(default="outer", description="缺失值填充方法") method: Optional[CorrelationMethod] = Field( default=CorrelationMethod.PEARSON, description="相关性计算方法:pearson(线性相关)、spearman(单调相关)、kendall(秩相关)", ) class CorrelationResponse(BaseModel): """相关性计算响应模型""" correlation: float = Field( ..., description="相关系数值,范围通常在-1到1之间,越接近±1表示相关性越强" ) method: str = Field(..., description="实际使用的相关性计算方法") data_length: int = Field(..., description="参与计算的有效数据点数量(排除NaN值后)") interpretation: str = Field(..., description="相关性强度的文字解释") @router.post( "/api/correlation", response_model=CorrelationResponse, operation_id="two_points_correlation_calculate", tags=["相关性分析"], ) async def correlation_calculate(request: CorrelationRequest): """ 计算两组数据之间的相关系数,支持多种相关性分析方法 **参数说明:** - **point1**: 第一个数据点名称, 例如 "W3.NODE1.XX" - **point2**: 第二个数据点名称, 例如 "W3.NODE1.XX" - **start_time**: 开始时间,格式:YYYY-MM-DD HH:MM:SS - **end_time**: 结束时间,格式:YYYY-MM-DD HH:MM:SS - **interval**: 采样间隔,如'1m'、'5m'、'1h'等 - **fill_method**: 缺失值填充方法(默认:outer) - **method**: 相关性计算方法 - pearson: 皮尔逊相关系数,衡量线性相关性(默认) - spearman: 斯皮尔曼秩相关系数,衡量单调相关性 - kendall: 肯德尔τ相关系数,基于秩次的相关性 **返回结果:** - correlation: 相关系数(-1到1之间) - method: 使用的计算方法 - data_length: 有效数据点数量 - interpretation: 相关性强度解释 **使用示例:** ```json { "point1": "temperature_sensor_01", "point2": "pressure_sensor_01", "start_time": "2024-01-01 00:00:00", "end_time": "2024-01-01 23:59:59", "interval": "1m", "method": "pearson" } ``` """ try: # 1. 获取数据 df_data = opt.api_select_to_frame( point_list=[request.point1, request.point2], start_time=request.start_time, end_time=request.end_time, interval=request.interval, fill_method=request.fill_method, ) # 2. 验证数据获取 if df_data is None or df_data.empty: raise HTTPException( status_code=422, detail={ "error_type": "数据获取失败", "message": f"无法获取数据点 {request.point1} 或 {request.point2} 的数据", "solution": "请检查数据点名称是否正确,时间范围是否有数据", }, ) # 3. 提取数据并转换为numpy数组 data1 = df_data[request.point1].values data2 = df_data[request.point2].values method = request.method.value # 获取枚举值 # 转换为numpy数组 array1 = np.array(data1) array2 = np.array(data2) # 处理NaN值 valid_mask = ~(np.isnan(array1) | np.isnan(array2)) valid_array1 = array1[valid_mask] valid_array2 = array2[valid_mask] if len(valid_array1) < 2: raise HTTPException( status_code=422, detail={ "error_type": "有效数据不足", "message": f"清理NaN值后只剩{len(valid_array1)}个有效数据点,至少需要2个", "original_length": len(array1), "valid_length": len(valid_array1), "solution": "请扩大时间范围或调整采样间隔以获得更多数据", }, ) # 计算相关性 if method == "pearson": # 皮尔逊相关系数 correlation = np.corrcoef(valid_array1, valid_array2)[0, 1] elif method == "spearman": # 斯皮尔曼秩相关系数 from scipy import stats correlation, _ = stats.spearmanr(valid_array1, valid_array2) else: # kendall # 肯德尔秩相关系数 from scipy import stats correlation, _ = stats.kendalltau(valid_array1, valid_array2) # 解释相关性强度 abs_corr = abs(correlation) if abs_corr >= 0.8: interpretation = "强相关" elif abs_corr >= 0.6: interpretation = "中等相关" elif abs_corr >= 0.3: interpretation = "弱相关" else: interpretation = "几乎无相关" if correlation < 0: interpretation = "负" + interpretation elif correlation > 0: interpretation = "正" + interpretation else: interpretation = "无相关" # 构造响应 return CorrelationResponse( correlation=float(correlation), method=method, data_length=len(valid_array1), interpretation=interpretation, ) except ValueError as e: # Pydantic验证错误 raise HTTPException( status_code=422, detail={ "error_type": "参数验证错误", "message": str(e), "suggestions": [ "检查数据点名称是否正确", "确保时间格式正确(YYYY-MM-DD HH:MM:SS)", "验证时间范围是否合理", ], }, ) except Exception as e: # 其他计算错误 raise HTTPException( status_code=500, detail={ "error_type": "计算错误", "message": f"相关性计算过程中发生错误: {str(e)}", "suggestions": [ "检查数据点是否存在", "确认时间范围内有足够的数据", "尝试使用不同的相关性方法", ], }, )

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/boyzhang666/data-analysys-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server