single_outlier_detection.py•7.72 kB
import logging
import numpy as np
from typing import List, Optional
from pydantic import BaseModel, Field
from fastapi import HTTPException, APIRouter
from config.config import *
from routers.utils.openplant import OpenPlant
# 配置日志
router = APIRouter()
logger = logging.getLogger("single_outlier_detection")
opt = OpenPlant(host=config_host, port=config_port, timeout=config_timeout)
class OutlierDetectionRequest(BaseModel):
"""异常值检测请求模型
用于检测数值数据中的异常值和离群点。
典型应用场景:数据质量检查、异常值识别、数据预处理等。
"""
gn_point_name: str = Field(
..., description="OpenPlant数据点名称,格式为 W3.NODE1.XX"
)
start_time: str = Field(..., description="分析起始时间,格式为YYYY-MM-DD HH:MM:SS")
end_time: str = Field(..., description="分析结束时间,格式为YYYY-MM-DD HH:MM:SS")
interval: str = Field(..., description="数据采样间隔,例如'1m'表示1分钟")
fill_method: str = Field(
default="outer", description="缺失值填充方法,例如'outer'表示外插"
)
methods: Optional[List[str]] = Field(
default=None,
description="异常值检测方法列表,如 ['sigma', 'mad', 'iqr'],不填则默认全部组合",
)
class OutlierDetectionResponse(BaseModel):
"""异常值检测响应模型"""
outliers: List[float] = Field(..., description="检测到的异常值列表")
outlier_count: int = Field(..., description="异常值数量")
data_length: int = Field(..., description="数据点总数")
outlier_rate: float = Field(..., description="异常值比例(%)")
methods_used: List[str] = Field(..., description="使用的检测方法")
summary: str = Field(..., description="检测结果摘要")
# ===================== 辅助检测方法 =====================
def detect_sigma_outliers(data: np.ndarray) -> list:
"""3σ法检测异常值"""
mean_val = np.mean(data)
std_val = np.std(data)
if std_val > 0:
z_scores = np.abs((data - mean_val) / std_val)
return list(data[z_scores > 1.5])
return []
def detect_mad_outliers(data: np.ndarray) -> list:
"""修正Z分数法检测异常值"""
median_val = np.median(data)
mad = np.median(np.abs(data - median_val))
if mad > 0:
modified_z_scores = 0.6745 * np.abs(data - median_val) / mad
return list(data[modified_z_scores > 3.0])
return []
def detect_iqr_outliers(data: np.ndarray) -> list:
"""IQR法检测异常值"""
q1 = np.percentile(data, 25)
q3 = np.percentile(data, 75)
iqr = q3 - q1
lower_bound = q1 - 1.5 * iqr
upper_bound = q3 + 1.5 * iqr
return list(data[(data < lower_bound) | (data > upper_bound)])
def get_outliers_by_methods(
data: np.ndarray, nan_values: np.ndarray, methods: list
) -> tuple:
"""根据指定方法检测异常值,返回异常值集合、方法列表、详细结果"""
method_map = {
"sigma": detect_sigma_outliers,
"mad": detect_mad_outliers,
"iqr": detect_iqr_outliers,
}
selected_methods = methods if methods else list(method_map.keys())
methods_used = []
outlier_details = []
outlier_set = set()
for method in selected_methods:
func = method_map.get(method)
if func:
result = func(data)
if result:
outlier_set.update(result)
outlier_details.append({"method": method, "outliers": result})
methods_used.append(method)
# NaN识别
if len(nan_values) > 0:
outlier_set.update(nan_values)
methods_used.append("NaN识别")
outlier_details.append({"method": "NaN识别", "outliers": list(nan_values)})
return list(outlier_set), methods_used, outlier_details
# ===================== 主检测接口 =====================
@router.post(
"/api/outlier_detection",
response_model=OutlierDetectionResponse,
operation_id="single_point_outlier_detection",
tags=["异常值检测"],
)
async def outlier_detection(request: OutlierDetectionRequest):
"""
检测数值数据中的异常值,使用组合检测方法
"""
try:
# 从数据库获取数据
point_list = [request.gn_point_name]
df_data = opt.api_select_to_frame(
point_list,
request.start_time,
request.end_time,
"span",
request.interval,
fill_method=request.fill_method,
)
def create_error_response(error_type, message, **kwargs):
return OutlierDetectionResponse(
outliers=[],
outlier_count=0,
data_length=0,
outlier_rate=0.0,
methods_used=[],
summary=f"错误:{message}",
)
# 数据检查
if df_data is None or df_data.empty:
return create_error_response(
"数据获取失败", f"无法获取数据点 {request.gn_point_name} 的数据"
)
raw_data = np.array(df_data[request.gn_point_name])
if len(raw_data) == 0:
return create_error_response("数据为空", "未获取到任何数据")
nan_mask = np.isnan(raw_data)
nan_values = raw_data[nan_mask]
valid_data = raw_data[~nan_mask]
if len(valid_data) < 5:
return create_error_response(
"数据不足", f"有效数据只有{len(valid_data)}个,需要至少5个数据点"
)
# 异常值检测
all_outliers, methods_used, outlier_details = get_outliers_by_methods(
valid_data, nan_values, request.methods
)
# 统计结果
total_count = len(raw_data)
outlier_count = len(all_outliers)
outlier_rate = round((outlier_count / total_count) * 100, 2)
if outlier_count == 0:
summary = "数据正常,未发现异常值"
elif outlier_rate < 5:
summary = f"发现少量异常值:{outlier_count}个({outlier_rate}%)"
elif outlier_rate < 15:
summary = f"发现适量异常值:{outlier_count}个({outlier_rate}%)"
else:
summary = f"发现大量异常值:{outlier_count}个({outlier_rate}%),建议检查数据质量"
return OutlierDetectionResponse(
outliers=all_outliers,
outlier_count=outlier_count,
data_length=total_count,
outlier_rate=outlier_rate,
methods_used=methods_used,
summary=summary,
# 可选:如需返回详细检测结果,可扩展模型或附加字段
# outlier_details=outlier_details
)
except ValueError as e:
raise HTTPException(
status_code=422,
detail={
"error_type": "参数验证错误",
"message": str(e),
"suggestions": [
"检查数据格式是否正确",
"确保数据为数值类型",
"验证参数在有效范围内",
],
},
)
except Exception as e:
raise HTTPException(
status_code=500,
detail={
"error_type": "计算错误",
"message": f"异常值检测过程中发生错误: {str(e)}",
"suggestions": [
"检查数据是否包含异常值",
"确认数据类型正确",
"尝试使用不同的检测方法",
],
},
)