Skip to main content
Glama

Data Analysis MCP Server

by boyzhang666
single_statistic_calculate.py16 kB
import logging import numpy as np from enum import Enum from typing import Optional, Dict from arch.unitroot import PhillipsPerron from pydantic import BaseModel, Field from statsmodels.tsa.stattools import adfuller, kpss from fastapi import HTTPException, APIRouter from config.config import * from routers.utils.openplant import OpenPlant # 全局配置 router = APIRouter() logger = logging.getLogger("single_statistic_calculate") opt = OpenPlant(host=config_host, port=config_port, timeout=config_timeout) class TestType(str, Enum): """支持的平稳性检验类型""" ADF = "adf" PP = "pp" KPSS = "kpss" CONSTANT = "constant" class StationarityRequest(BaseModel): """时间序列平稳性检验请求模型 用于检验时间序列数据的平稳性。 系统会根据数据特征自动选择最适合的检验方法,显著性水平固定为0.05。 典型应用场景:时间序列分析前的数据预处理、模型选择等。 """ gn_point_name: str = Field( ..., description="OpenPlant数据点名称,格式为 W3.NODE1.XX" ) start_time: str = Field(..., description="分析起始时间,格式为YYYY-MM-DD HH:MM:SS") end_time: str = Field(..., description="分析结束时间,格式为YYYY-MM-DD HH:MM:SS") interval: str = Field(..., description="数据采样间隔,例如'1m'表示1分钟") fill_method: str = Field( default="outer", description="缺失值填充方法,例如'outer'表示外插" ) class StationarityResponse(BaseModel): """平稳性检验响应模型""" is_stationary: bool = Field(..., description="时间序列是否平稳") test_statistic: float = Field(..., description="检验统计量值") p_value: float = Field(..., description="p值,用于判断显著性") method: str = Field(..., description="使用的检验方法说明") method_type: str = Field( ..., description="使用的检验方法类型:adf/pp/kpss/constant" ) significance_level: float = Field(..., description="用于判定的显著性水平") data_length: int = Field(..., description="参与检验的有效数据点数量") original_length: int = Field(..., description="原始数据点数量(包含缺失值)") missing_count: int = Field(..., description="缺失值数量") mean: float = Field(..., description="数据均值") std: float = Field(..., description="数据标准差") variance: float = Field(..., description="数据方差") trend_slope: float = Field(..., description="线性趋势斜率的绝对值") relative_trend: float = Field(..., description="相对趋势强度(斜率/均值)") cv: float = Field(..., description="变异系数(标准差/均值)") lags_used: Optional[int] = Field(None, description="检验使用的滞后阶数(若适用)") critical_values: Optional[Dict[str, float]] = Field( None, description="检验的临界值(若适用)" ) selection_rationale: str = Field(..., description="方法选择的理由") interpretation: str = Field(..., description="检验结果的文字解释") # ---------- 辅助计算函数 ---------- # def compute_features(data_array: np.ndarray) -> Dict[str, float]: """计算序列的统计特征与趋势指标""" n = len(data_array) mean = float(np.mean(data_array)) std = float(np.std(data_array)) variance = float(np.var(data_array)) # 线性趋势斜率(绝对值) x = np.arange(n) trend_slope = float(abs(np.polyfit(x, data_array, 1)[0])) # 相对趋势强度 if abs(mean) > 1e-12: relative_trend = float(trend_slope / (abs(mean) + 1e-12)) cv = float(std / (abs(mean) + 1e-12)) else: relative_trend = trend_slope cv = std return { "n": n, "mean": mean, "std": std, "variance": variance, "trend_slope": trend_slope, "relative_trend": relative_trend, "cv": cv, } def select_optimal_test(features: Dict[str, float]) -> Dict[str, Optional[str]]: """基于数据特征智能选择最适合的平稳性检验方法 选择逻辑: - 小样本(n < 50):使用ADF检验 - 大样本(n >= 100)且趋势不明显(relative_trend <= 0.01):使用Phillips-Perron检验 - 中等样本(50 <= n < 100)且方差较小(cv < 0.5):使用KPSS检验 - 其他情况:使用ADF检验 - 对于KPSS,若相对趋势较明显(relative_trend > 0.05),选择趋势平稳(regression='ct'),否则水平平稳(regression='c') """ n = features["n"] relative_trend = features["relative_trend"] cv = features["cv"] if n < 50: return { "type": TestType.ADF.value, "description": "ADF检验(适用于小样本时间序列)", "rationale": f"样本量较小(n={n}),ADF更稳健", "kpss_regression": None, } elif n >= 100 and relative_trend <= 0.01: return { "type": TestType.PP.value, "description": "Phillips-Perron检验(适用于大样本且趋势不明显的序列)", "rationale": f"样本量大(n={n})且相对趋势较弱(relative_trend={relative_trend:.4f})", "kpss_regression": None, } elif 50 <= n < 100 and cv < 0.5: regression = "ct" if relative_trend > 0.05 else "c" reg_desc = "趋势平稳" if regression == "ct" else "水平平稳" return { "type": TestType.KPSS.value, "description": f"KPSS检验(适用于中等样本低变异序列,{reg_desc})", "rationale": f"样本量中等(n={n})且变异系数较低(cv={cv:.4f}),相对趋势={relative_trend:.4f}", "kpss_regression": regression, } else: return { "type": TestType.ADF.value, "description": "ADF检验(默认选择)", "rationale": f"不满足PP/KPSS的特定条件,选择通用的ADF", "kpss_regression": None, } def run_adf(array: np.ndarray, alpha: float) -> Dict[str, Optional[float]]: """执行 ADF 检验""" # autolag 使用 AIC 以获得更稳健的滞后阶选择 result = adfuller(array, autolag="AIC") test_statistic = float(result[0]) p_value = float(result[1]) lags_used = int(result[2]) nobs = int(result[3]) critical_values = {k: float(v) for k, v in result[4].items()} is_stationary = p_value < alpha return { "test_statistic": test_statistic, "p_value": p_value, "is_stationary": is_stationary, "lags_used": lags_used, "nobs": nobs, "critical_values": critical_values, "method_type": TestType.ADF.value, } def run_pp(array: np.ndarray, alpha: float) -> Dict[str, Optional[float]]: """执行 Phillips-Perron 检验""" pp_test = PhillipsPerron(array) test_statistic = float(pp_test.stat) p_value = float(pp_test.pvalue) is_stationary = p_value < alpha # arch 的结果对象不提供临界值与滞后阶数的标准化接口 return { "test_statistic": test_statistic, "p_value": p_value, "is_stationary": is_stationary, "lags_used": None, "nobs": len(array), "critical_values": None, "method_type": TestType.PP.value, } def run_kpss( array: np.ndarray, alpha: float, regression: str = "ct" ) -> Dict[str, Optional[float]]: """执行 KPSS 检验 注意:KPSS 的原假设为“平稳”,因此判定逻辑与 ADF/PP 相反。 """ result = kpss(array, regression=regression) test_statistic = float(result[0]) p_value = float(result[1]) lags_used = int(result[2]) critical_values = {k: float(v) for k, v in result[3].items()} is_stationary = p_value >= alpha return { "test_statistic": test_statistic, "p_value": p_value, "is_stationary": is_stationary, "lags_used": lags_used, "nobs": len(array), "critical_values": critical_values, "method_type": TestType.KPSS.value, } def build_interpretation( is_stationary: bool, p_value: float, alpha: float, method_type: str ) -> str: """构造结果解释""" base = f"显著性水平为 {alpha:.2f},p值为 {p_value:.4f}。" if method_type == TestType.KPSS.value: # KPSS:原假设为平稳 if is_stationary: return base + "根据KPSS检验,无法拒绝原假设,因此序列可视为平稳。" else: return base + "根据KPSS检验,拒绝原假设,因此序列可能存在非平稳性。" else: # ADF/PP:原假设为存在单位根(非平稳) if is_stationary: return base + "根据ADF/PP检验,拒绝存在单位根的原假设,因此序列可视为平稳。" else: return ( base + "根据ADF/PP检验,无法拒绝存在单位根的原假设,因此序列可能为非平稳。" ) @router.post( "/api/statistic_calculate", response_model=StationarityResponse, operation_id="single_point_statistic_calculate", tags=["统计检验"], ) async def statistic_calculate(request: StationarityRequest): """ 检验时间序列数据的平稳性,系统自动选择最适合的检验方法 参数说明见请求模型。方法选择与返回说明: - 自动根据数据特征选择 ADF / PP / KPSS 检验 - 显著性水平固定为 0.05 - 返回包含数据统计特征、方法选择理由、临界值与滞后等丰富信息 """ try: # 从数据服务获取数据 point_list = [request.gn_point_name] df_data = opt.api_select_to_frame( point_list, request.start_time, request.end_time, "span", request.interval, fill_method=request.fill_method, ) if df_data is None or df_data.empty: raise HTTPException( status_code=404, detail={ "error_type": "数据获取失败", "message": f"无法获取数据点 {request.gn_point_name} 的数据", "point_name": request.gn_point_name, "time_range": f"{request.start_time} 到 {request.end_time}", "solution": "请检查数据点名称是否正确,时间范围是否正确,时间范围内是否有数据", }, ) # 原始与清洗后的数据 series = df_data[request.gn_point_name].values original_length = len(series) array = np.asarray(series, dtype=float) missing_mask = np.isnan(array) missing_count = int(np.sum(missing_mask)) array = array[~missing_mask] if len(array) < 10: raise HTTPException( status_code=422, detail={ "error_type": "数据不足", "message": f"有效数据只有{len(array)}个,平稳性检验至少需要10个观测值", "recommendation": "请扩大时间范围或调整采样间隔", }, ) significance_level = 0.05 # 序列为常量(或近似常量)时,直接判定为平稳 std_val = float(np.std(array)) if std_val < 1e-12: features = compute_features(array) interpretation = "序列为常量(或近似常量),理论上为平稳序列。" logger.info( f"[Stationarity] Constant series detected: mean={features['mean']:.6f}, std={features['std']:.6e}" ) return StationarityResponse( is_stationary=True, test_statistic=0.0, p_value=0.0, method="常量序列判定", method_type=TestType.CONSTANT.value, significance_level=significance_level, data_length=len(array), original_length=original_length, missing_count=missing_count, mean=features["mean"], std=features["std"], variance=features["variance"], trend_slope=features["trend_slope"], relative_trend=features["relative_trend"], cv=features["cv"], lags_used=None, critical_values=None, selection_rationale="数据标准差近似为0,直接视为平稳", interpretation=interpretation, ) # 选择方法并执行检验 features = compute_features(array) choice = select_optimal_test(features) test_type = choice["type"] method_description = choice["description"] rationale = choice["rationale"] kpss_regression = choice["kpss_regression"] logger.info( f"[Stationarity] choice={test_type}, rationale={rationale}, features={features}" ) if test_type == TestType.ADF.value: result = run_adf(array, significance_level) elif test_type == TestType.PP.value: result = run_pp(array, significance_level) elif test_type == TestType.KPSS.value: result = run_kpss( array, significance_level, regression=kpss_regression or "ct" ) else: raise HTTPException( status_code=500, detail={ "error_type": "方法选择错误", "message": f"未知的检验方法: {test_type}", }, ) interpretation = build_interpretation( result["is_stationary"], result["p_value"], significance_level, result["method_type"], ) return StationarityResponse( is_stationary=bool(result["is_stationary"]), test_statistic=float(result["test_statistic"]), p_value=float(result["p_value"]), method=method_description, method_type=str(result["method_type"]), significance_level=significance_level, data_length=len(array), original_length=original_length, missing_count=missing_count, mean=features["mean"], std=features["std"], variance=features["variance"], trend_slope=features["trend_slope"], relative_trend=features["relative_trend"], cv=features["cv"], lags_used=result["lags_used"], critical_values=result["critical_values"], selection_rationale=rationale, interpretation=interpretation, ) except ValueError as e: # Pydantic验证错误或统计计算值错误 logger.exception("ValueError in statistic_calculate: %s", str(e)) raise HTTPException( status_code=422, detail={ "error_type": "参数或数据验证错误", "message": str(e), "suggestions": [ "检查数据格式是否正确", "确保数据为数值类型", "验证显著性水平参数在有效范围内", ], }, ) except Exception as e: # 其他计算错误 logger.exception("Unhandled error in statistic_calculate: %s", str(e)) raise HTTPException( status_code=500, detail={ "error_type": "计算错误", "message": f"平稳性检验过程中发生错误: {str(e)}", "suggestions": [ "检查数据是否包含异常值", "确认数据类型正确", "尝试使用不同的检验方法", ], }, )

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/boyzhang666/data-analysys-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server