Skip to main content
Glama

Data Analysis MCP Server

by boyzhang666
single_outlier_detection.py7.72 kB
import logging import numpy as np from typing import List, Optional from pydantic import BaseModel, Field from fastapi import HTTPException, APIRouter from config.config import * from routers.utils.openplant import OpenPlant # 配置日志 router = APIRouter() logger = logging.getLogger("single_outlier_detection") opt = OpenPlant(host=config_host, port=config_port, timeout=config_timeout) class OutlierDetectionRequest(BaseModel): """异常值检测请求模型 用于检测数值数据中的异常值和离群点。 典型应用场景:数据质量检查、异常值识别、数据预处理等。 """ gn_point_name: str = Field( ..., description="OpenPlant数据点名称,格式为 W3.NODE1.XX" ) start_time: str = Field(..., description="分析起始时间,格式为YYYY-MM-DD HH:MM:SS") end_time: str = Field(..., description="分析结束时间,格式为YYYY-MM-DD HH:MM:SS") interval: str = Field(..., description="数据采样间隔,例如'1m'表示1分钟") fill_method: str = Field( default="outer", description="缺失值填充方法,例如'outer'表示外插" ) methods: Optional[List[str]] = Field( default=None, description="异常值检测方法列表,如 ['sigma', 'mad', 'iqr'],不填则默认全部组合", ) class OutlierDetectionResponse(BaseModel): """异常值检测响应模型""" outliers: List[float] = Field(..., description="检测到的异常值列表") outlier_count: int = Field(..., description="异常值数量") data_length: int = Field(..., description="数据点总数") outlier_rate: float = Field(..., description="异常值比例(%)") methods_used: List[str] = Field(..., description="使用的检测方法") summary: str = Field(..., description="检测结果摘要") # ===================== 辅助检测方法 ===================== def detect_sigma_outliers(data: np.ndarray) -> list: """3σ法检测异常值""" mean_val = np.mean(data) std_val = np.std(data) if std_val > 0: z_scores = np.abs((data - mean_val) / std_val) return list(data[z_scores > 1.5]) return [] def detect_mad_outliers(data: np.ndarray) -> list: """修正Z分数法检测异常值""" median_val = np.median(data) mad = np.median(np.abs(data - median_val)) if mad > 0: modified_z_scores = 0.6745 * np.abs(data - median_val) / mad return list(data[modified_z_scores > 3.0]) return [] def detect_iqr_outliers(data: np.ndarray) -> list: """IQR法检测异常值""" q1 = np.percentile(data, 25) q3 = np.percentile(data, 75) iqr = q3 - q1 lower_bound = q1 - 1.5 * iqr upper_bound = q3 + 1.5 * iqr return list(data[(data < lower_bound) | (data > upper_bound)]) def get_outliers_by_methods( data: np.ndarray, nan_values: np.ndarray, methods: list ) -> tuple: """根据指定方法检测异常值,返回异常值集合、方法列表、详细结果""" method_map = { "sigma": detect_sigma_outliers, "mad": detect_mad_outliers, "iqr": detect_iqr_outliers, } selected_methods = methods if methods else list(method_map.keys()) methods_used = [] outlier_details = [] outlier_set = set() for method in selected_methods: func = method_map.get(method) if func: result = func(data) if result: outlier_set.update(result) outlier_details.append({"method": method, "outliers": result}) methods_used.append(method) # NaN识别 if len(nan_values) > 0: outlier_set.update(nan_values) methods_used.append("NaN识别") outlier_details.append({"method": "NaN识别", "outliers": list(nan_values)}) return list(outlier_set), methods_used, outlier_details # ===================== 主检测接口 ===================== @router.post( "/api/outlier_detection", response_model=OutlierDetectionResponse, operation_id="single_point_outlier_detection", tags=["异常值检测"], ) async def outlier_detection(request: OutlierDetectionRequest): """ 检测数值数据中的异常值,使用组合检测方法 """ try: # 从数据库获取数据 point_list = [request.gn_point_name] df_data = opt.api_select_to_frame( point_list, request.start_time, request.end_time, "span", request.interval, fill_method=request.fill_method, ) def create_error_response(error_type, message, **kwargs): return OutlierDetectionResponse( outliers=[], outlier_count=0, data_length=0, outlier_rate=0.0, methods_used=[], summary=f"错误:{message}", ) # 数据检查 if df_data is None or df_data.empty: return create_error_response( "数据获取失败", f"无法获取数据点 {request.gn_point_name} 的数据" ) raw_data = np.array(df_data[request.gn_point_name]) if len(raw_data) == 0: return create_error_response("数据为空", "未获取到任何数据") nan_mask = np.isnan(raw_data) nan_values = raw_data[nan_mask] valid_data = raw_data[~nan_mask] if len(valid_data) < 5: return create_error_response( "数据不足", f"有效数据只有{len(valid_data)}个,需要至少5个数据点" ) # 异常值检测 all_outliers, methods_used, outlier_details = get_outliers_by_methods( valid_data, nan_values, request.methods ) # 统计结果 total_count = len(raw_data) outlier_count = len(all_outliers) outlier_rate = round((outlier_count / total_count) * 100, 2) if outlier_count == 0: summary = "数据正常,未发现异常值" elif outlier_rate < 5: summary = f"发现少量异常值:{outlier_count}个({outlier_rate}%)" elif outlier_rate < 15: summary = f"发现适量异常值:{outlier_count}个({outlier_rate}%)" else: summary = f"发现大量异常值:{outlier_count}个({outlier_rate}%),建议检查数据质量" return OutlierDetectionResponse( outliers=all_outliers, outlier_count=outlier_count, data_length=total_count, outlier_rate=outlier_rate, methods_used=methods_used, summary=summary, # 可选:如需返回详细检测结果,可扩展模型或附加字段 # outlier_details=outlier_details ) except ValueError as e: raise HTTPException( status_code=422, detail={ "error_type": "参数验证错误", "message": str(e), "suggestions": [ "检查数据格式是否正确", "确保数据为数值类型", "验证参数在有效范围内", ], }, ) except Exception as e: raise HTTPException( status_code=500, detail={ "error_type": "计算错误", "message": f"异常值检测过程中发生错误: {str(e)}", "suggestions": [ "检查数据是否包含异常值", "确认数据类型正确", "尝试使用不同的检测方法", ], }, )

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/boyzhang666/data-analysys-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server