two_correlation_calculate.py•7.6 kB
import logging
import numpy as np
from enum import Enum
from typing import Optional
from pydantic import BaseModel, Field
from fastapi import HTTPException, APIRouter
from config.config import *
from routers.utils.openplant import OpenPlant
# 全局配置
router = APIRouter()
logger = logging.getLogger("two_correlation_calculate")
opt = OpenPlant(host=config_host, port=config_port, timeout=config_timeout)
class CorrelationMethod(str, Enum):
"""相关性计算方法枚举,避免字符串输入错误"""
PEARSON = "pearson"
SPEARMAN = "spearman"
KENDALL = "kendall"
class CorrelationRequest(BaseModel):
"""相关性计算请求模型
用于计算两个数据点之间的相关性强度。
典型应用场景:分析温度与压力的关联性、流量与压力的相关性等。
"""
point1: str = Field(..., description="第一个数据点名称, 例如 'W3.NODE1.XX'")
point2: str = Field(..., description="第二个数据点名称, 例如 'W3.NODE1.XX'")
start_time: str = Field(..., description="开始时间 YYYY-MM-DD HH:MM:SS")
end_time: str = Field(..., description="结束时间 YYYY-MM-DD HH:MM:SS")
interval: str = Field(..., description="采样间隔,如'1m'")
fill_method: str = Field(default="outer", description="缺失值填充方法")
method: Optional[CorrelationMethod] = Field(
default=CorrelationMethod.PEARSON,
description="相关性计算方法:pearson(线性相关)、spearman(单调相关)、kendall(秩相关)",
)
class CorrelationResponse(BaseModel):
"""相关性计算响应模型"""
correlation: float = Field(
..., description="相关系数值,范围通常在-1到1之间,越接近±1表示相关性越强"
)
method: str = Field(..., description="实际使用的相关性计算方法")
data_length: int = Field(..., description="参与计算的有效数据点数量(排除NaN值后)")
interpretation: str = Field(..., description="相关性强度的文字解释")
@router.post(
"/api/correlation",
response_model=CorrelationResponse,
operation_id="two_points_correlation_calculate",
tags=["相关性分析"],
)
async def correlation_calculate(request: CorrelationRequest):
"""
计算两组数据之间的相关系数,支持多种相关性分析方法
**参数说明:**
- **point1**: 第一个数据点名称, 例如 "W3.NODE1.XX"
- **point2**: 第二个数据点名称, 例如 "W3.NODE1.XX"
- **start_time**: 开始时间,格式:YYYY-MM-DD HH:MM:SS
- **end_time**: 结束时间,格式:YYYY-MM-DD HH:MM:SS
- **interval**: 采样间隔,如'1m'、'5m'、'1h'等
- **fill_method**: 缺失值填充方法(默认:outer)
- **method**: 相关性计算方法
- pearson: 皮尔逊相关系数,衡量线性相关性(默认)
- spearman: 斯皮尔曼秩相关系数,衡量单调相关性
- kendall: 肯德尔τ相关系数,基于秩次的相关性
**返回结果:**
- correlation: 相关系数(-1到1之间)
- method: 使用的计算方法
- data_length: 有效数据点数量
- interpretation: 相关性强度解释
**使用示例:**
```json
{
"point1": "temperature_sensor_01",
"point2": "pressure_sensor_01",
"start_time": "2024-01-01 00:00:00",
"end_time": "2024-01-01 23:59:59",
"interval": "1m",
"method": "pearson"
}
```
"""
try:
# 1. 获取数据
df_data = opt.api_select_to_frame(
point_list=[request.point1, request.point2],
start_time=request.start_time,
end_time=request.end_time,
interval=request.interval,
fill_method=request.fill_method,
)
# 2. 验证数据获取
if df_data is None or df_data.empty:
raise HTTPException(
status_code=422,
detail={
"error_type": "数据获取失败",
"message": f"无法获取数据点 {request.point1} 或 {request.point2} 的数据",
"solution": "请检查数据点名称是否正确,时间范围是否有数据",
},
)
# 3. 提取数据并转换为numpy数组
data1 = df_data[request.point1].values
data2 = df_data[request.point2].values
method = request.method.value # 获取枚举值
# 转换为numpy数组
array1 = np.array(data1)
array2 = np.array(data2)
# 处理NaN值
valid_mask = ~(np.isnan(array1) | np.isnan(array2))
valid_array1 = array1[valid_mask]
valid_array2 = array2[valid_mask]
if len(valid_array1) < 2:
raise HTTPException(
status_code=422,
detail={
"error_type": "有效数据不足",
"message": f"清理NaN值后只剩{len(valid_array1)}个有效数据点,至少需要2个",
"original_length": len(array1),
"valid_length": len(valid_array1),
"solution": "请扩大时间范围或调整采样间隔以获得更多数据",
},
)
# 计算相关性
if method == "pearson":
# 皮尔逊相关系数
correlation = np.corrcoef(valid_array1, valid_array2)[0, 1]
elif method == "spearman":
# 斯皮尔曼秩相关系数
from scipy import stats
correlation, _ = stats.spearmanr(valid_array1, valid_array2)
else: # kendall
# 肯德尔秩相关系数
from scipy import stats
correlation, _ = stats.kendalltau(valid_array1, valid_array2)
# 解释相关性强度
abs_corr = abs(correlation)
if abs_corr >= 0.8:
interpretation = "强相关"
elif abs_corr >= 0.6:
interpretation = "中等相关"
elif abs_corr >= 0.3:
interpretation = "弱相关"
else:
interpretation = "几乎无相关"
if correlation < 0:
interpretation = "负" + interpretation
elif correlation > 0:
interpretation = "正" + interpretation
else:
interpretation = "无相关"
# 构造响应
return CorrelationResponse(
correlation=float(correlation),
method=method,
data_length=len(valid_array1),
interpretation=interpretation,
)
except ValueError as e:
# Pydantic验证错误
raise HTTPException(
status_code=422,
detail={
"error_type": "参数验证错误",
"message": str(e),
"suggestions": [
"检查数据点名称是否正确",
"确保时间格式正确(YYYY-MM-DD HH:MM:SS)",
"验证时间范围是否合理",
],
},
)
except Exception as e:
# 其他计算错误
raise HTTPException(
status_code=500,
detail={
"error_type": "计算错误",
"message": f"相关性计算过程中发生错误: {str(e)}",
"suggestions": [
"检查数据点是否存在",
"确认时间范围内有足够的数据",
"尝试使用不同的相关性方法",
],
},
)