single_distribution_analysis.py•7.99 kB
import logging
import numpy as np
from typing import List, Optional
from pydantic import BaseModel, Field
from fastapi import HTTPException, APIRouter
from config.config import *
from routers.utils.openplant import OpenPlant
# 全局配置
router = APIRouter()
logger = logging.getLogger("single_distribution_analysis")
opt = OpenPlant(host=config_host, port=config_port, timeout=config_timeout)
class DistributionRequest(BaseModel):
"""数据分布趋势分析请求模型"""
gn_point_name: str = Field(
..., description="OpenPlant数据点名称,格式为 W3.NODE1.XX"
)
start_time: str = Field(..., description="分析起始时间,格式为YYYY-MM-DD HH:MM:SS")
end_time: str = Field(..., description="分析结束时间,格式为YYYY-MM-DD HH:MM:SS")
interval: str = Field(..., description="数据采样间隔,例如'1m'表示1分钟")
fill_method: str = Field(
default="outer", description="缺失值填充方法,例如'outer'表示外插"
)
class DistributionResponse(BaseModel):
"""数据分布趋势分析响应模型"""
quartiles: dict = Field(
..., description="四分位数信息,包含Q1、Q2(中位数)、Q3和IQR"
)
statistics: dict = Field(
..., description="基本统计信息,包含均值、标准差、数据量等"
)
distribution_summary: str = Field(..., description="数据分布特征的文字总结")
trend_analysis: str = Field(..., description="数据趋势分析结果")
shape_characteristics: dict = Field(
..., description="分布形状特征,包含偏度、峰度等"
)
# ===================== 辅助统计函数 =====================
def calc_quartiles(data: np.ndarray) -> dict:
"""计算四分位数及IQR"""
q1, q2, q3 = np.percentile(data, [25, 50, 75])
iqr = q3 - q1
return {
"Q1": float(q1),
"Q2 (median)": float(q2),
"Q3": float(q3),
"IQR": float(iqr),
}
def calc_statistics(data: np.ndarray) -> dict:
"""计算基础统计量"""
mean_val = float(np.mean(data))
std_val = float(np.std(data))
return {
"mean": mean_val,
"std": std_val,
"min": float(np.min(data)),
"max": float(np.max(data)),
"count": len(data),
"variance": float(np.var(data)),
"range": float(np.max(data) - np.min(data)),
"coefficient_of_variation": float(std_val / mean_val) if mean_val != 0 else 0,
}
def calc_shape_characteristics(
data: np.ndarray, mean_val: float, std_val: float
) -> dict:
"""计算分布形状特征"""
skewness = float(np.mean(((data - mean_val) / std_val) ** 3))
kurtosis = float(np.mean(((data - mean_val) / std_val) ** 4)) - 3
return {
"skewness": skewness,
"kurtosis": kurtosis,
"is_symmetric": abs(skewness) < 0.5,
"distribution_shape": (
"近似对称"
if abs(skewness) < 0.5
else ("右偏(正偏)" if skewness > 0.5 else "左偏(负偏)")
),
"tail_heaviness": (
"正常" if abs(kurtosis) < 0.5 else ("厚尾" if kurtosis > 0.5 else "薄尾")
),
}
def build_distribution_summary(
shape_characteristics: dict, quartiles: dict, statistics: dict, data: np.ndarray
) -> str:
"""生成分布特征总结"""
return (
f"数据呈{shape_characteristics['distribution_shape']}分布,中位数为{quartiles['Q2 (median)']:.2f},"
f"四分位距为{quartiles['IQR']:.2f}。数据范围从{statistics['min']:.2f}到{statistics['max']:.2f},"
f"变异系数为{statistics['coefficient_of_variation']:.3f}。"
)
def build_trend_analysis(
iqr: float, statistics: dict, shape_characteristics: dict
) -> str:
"""生成趋势分析描述"""
skewness = shape_characteristics["skewness"]
kurtosis = shape_characteristics["kurtosis"]
trend_analysis = ""
if iqr == 0:
trend_analysis = (
"数据集中度极高,所有数据点都集中在四分位数范围内,显示出极强的一致性。"
)
elif statistics["coefficient_of_variation"] < 0.1:
trend_analysis = "数据变异性很小,显示出高度的稳定性和一致性。"
elif statistics["coefficient_of_variation"] < 0.3:
trend_analysis = "数据变异性适中,整体趋势相对稳定。"
else:
trend_analysis = "数据变异性较大,存在明显的波动和分散趋势。"
if abs(skewness) > 1:
trend_analysis += f" 分布明显{shape_characteristics['distribution_shape']},数据存在不对称的趋势特征。"
if abs(kurtosis) > 1:
trend_analysis += f" 分布呈现{shape_characteristics['tail_heaviness']}特征,极值出现的概率{'较高' if kurtosis > 1 else '较低'}。"
return trend_analysis
# ===================== 主分布分析接口 =====================
@router.post(
"/api/distribution_analysis",
response_model=DistributionResponse,
operation_id="single_point_distribution_analysis",
)
async def distribution_analysis(request: DistributionRequest):
"""
执行数据分布趋势分析,专注于数据的分布特征和趋势模式
"""
try:
logger.info(
f"开始获取数据点 {request.gn_point_name} 的数据,时间范围:{request.start_time} 到 {request.end_time}"
)
point_list = [request.gn_point_name]
df_data = opt.api_select_to_frame(
point_list,
request.start_time,
request.end_time,
"span",
request.interval,
fill_method=request.fill_method,
)
if df_data is None or df_data.empty:
raise HTTPException(
status_code=404,
detail={
"error_type": "数据获取失败",
"message": f"无法获取数据点 {request.gn_point_name} 的数据",
"point_name": request.gn_point_name,
"time_range": f"{request.start_time} 到 {request.end_time}",
"solution": "请检查数据点名称是否正确,时间范围是否有数据",
},
)
logger.info(f"返回的数据包含列: {list(df_data.columns)}")
data = np.array(df_data[request.gn_point_name].dropna())
logger.info(f"成功获取 {len(data)} 个有效数据点")
if len(data) < 5:
raise HTTPException(
status_code=422,
detail={
"error_type": "数据不足",
"message": f"当前数据只有{len(data)}个观测值,分布分析至少需要5个",
"current_length": len(data),
"minimum_required": 5,
"recommendation": "建议扩大时间范围或调整采样间隔以获得更多数据",
"solution": "请调整查询参数以获取更多数据样本",
},
)
quartiles = calc_quartiles(data)
statistics = calc_statistics(data)
shape_characteristics = calc_shape_characteristics(
data, statistics["mean"], statistics["std"]
)
distribution_summary = build_distribution_summary(
shape_characteristics, quartiles, statistics, data
)
trend_analysis = build_trend_analysis(
quartiles["IQR"], statistics, shape_characteristics
)
return DistributionResponse(
quartiles=quartiles,
statistics=statistics,
distribution_summary=distribution_summary,
trend_analysis=trend_analysis,
shape_characteristics=shape_characteristics,
)
except Exception as e:
logger.error(f"分布趋势分析错误: {str(e)}")
raise HTTPException(
status_code=500, detail=f"分布趋势分析过程中发生错误: {str(e)}"
)