Skip to main content
Glama

Data Analysis MCP Server

by boyzhang666
point_history_query.py12.2 kB
import logging import pandas as pd from typing import List, Dict, Any, Optional, Union from datetime import datetime from fastapi import APIRouter, HTTPException, Query from pydantic import BaseModel, Field, field_validator from config.config import * from routers.utils.openplant import OpenPlant # 设置日志和路由 router = APIRouter() logger = logging.getLogger("point_history_query") opt = OpenPlant(host=config_host, port=config_port, timeout=config_timeout) class SingleHistoryRequest(BaseModel): """单个测点历史数据查询请求模型""" point_name: str = Field(..., description="测点名称", example="W3.NODE1.XX") start_time: str = Field( ..., description="开始时间,格式:YYYY-MM-DD HH:MM:SS", example="2024-01-01 00:00:00", ) end_time: str = Field( ..., description="结束时间,格式:YYYY-MM-DD HH:MM:SS", example="2024-01-01 23:59:59", ) interval: str = Field(default="1m", description="采样间隔", example="1m") fill_method: str = Field( default="outer", description="数据合并方式", example="outer" ) class MultiHistoryRequest(BaseModel): """多个测点历史数据查询请求模型""" point_names: List[str] = Field( ..., description="测点名称列表", example=["W3.NODE1.XX", "W3.NODE1.YY"] ) start_time: str = Field( ..., description="开始时间,格式:YYYY-MM-DD HH:MM:SS", example="2024-01-01 00:00:00", ) end_time: str = Field( ..., description="结束时间,格式:YYYY-MM-DD HH:MM:SS", example="2024-01-01 23:59:59", ) interval: str = Field(default="1m", description="采样间隔", example="1m") fill_method: str = Field( default="outer", description="数据合并方式", example="outer" ) class HistoryDataPoint(BaseModel): """历史数据点模型""" timestamp: str = Field(..., description="时间戳") value: Optional[Union[float, int]] = Field(..., description="数值") class SingleHistoryResponse(BaseModel): """单个测点历史数据查询响应模型""" point_name: str = Field(..., description="测点名称") data_count: int = Field(..., description="数据点数量") data: List[HistoryDataPoint] = Field(..., description="历史数据列表") class MultiHistoryResponse(BaseModel): """多个测点历史数据查询响应模型""" successful_points: List[str] = Field(..., description="成功查询的测点") failed_points: List[str] = Field(..., description="查询失败的测点") data_count: int = Field(..., description="时间序列长度") data: Dict[str, List[HistoryDataPoint]] = Field(..., description="各测点的历史数据") class HistoryDataFrameResponse(BaseModel): """DataFrame格式的历史数据响应模型""" successful_points: List[str] = Field(..., description="成功查询的测点") failed_points: List[str] = Field(..., description="查询失败的测点") data_count: int = Field(..., description="时间序列长度") timestamps: List[str] = Field(..., description="时间戳列表") data_matrix: List[List[Optional[float]]] = Field( ..., description="数据矩阵,每行对应一个时间点,每列对应一个测点" ) def convert_dataframe_to_dict( df: pd.DataFrame, point_names: List[str] ) -> Dict[str, List[HistoryDataPoint]]: """将DataFrame转换为字典格式的历史数据""" result = {} for point_name in point_names: point_data = [] if point_name in df.columns: for timestamp, value in df[point_name].items(): point_data.append( HistoryDataPoint( timestamp=( timestamp.strftime("%Y-%m-%d %H:%M:%S") if pd.notna(timestamp) else "" ), value=float(value) if pd.notna(value) else None, ) ) result[point_name] = point_data return result def convert_dataframe_to_matrix(df: pd.DataFrame, point_names: List[str]) -> tuple: """将DataFrame转换为矩阵格式""" timestamps = [ts.strftime("%Y-%m-%d %H:%M:%S") for ts in df.index] data_matrix = [] for timestamp in df.index: row = [] for point_name in point_names: if point_name in df.columns: value = df.loc[timestamp, point_name] row.append(float(value) if pd.notna(value) else None) else: row.append(None) data_matrix.append(row) return timestamps, data_matrix @router.post( "/api/single_point_history_query", response_model=SingleHistoryResponse, operation_id="single_point_history_query", tags=["历史数据查询"], ) async def single_history_query(request: SingleHistoryRequest): """ 查询单个测点的历史数据 **参数说明:** - **point_name**: 测点名称,例如 "W3.NODE1.XX" - **start_time**: 开始时间,格式:YYYY-MM-DD HH:MM:SS - **end_time**: 结束时间,格式:YYYY-MM-DD HH:MM:SS - **mode**: 查询模式,支持 avg、max、min、span、raw - **interval**: 只有span模式下支持采样间隔,如 "1m"、"5m"、"1h" - **fill_method**: 数据合并方式,如 "outer"、"inner" **返回结果:** - point_name: 测点名称 - data_count: 数据点数量 - data: 历史数据列表,包含时间戳和数值 - query_timestamp: 查询时间戳 **使用示例:** ```json { "point_name": "W3.NODE1.XX", "start_time": "2024-01-01 00:00:00", "end_time": "2024-01-01 23:59:59", "interval": "1m", } ``` """ try: # 使用api_select_to_frame获取数据 df_data = opt.api_select_to_frame( point_list=[request.point_name], start_time=request.start_time, end_time=request.end_time, interval=request.interval, fill_method=request.fill_method, ) # 验证数据获取 if df_data is None or df_data.empty: raise HTTPException( status_code=404, detail={ "error_type": "数据获取失败", "message": f"无法获取测点 {request.point_name} 在指定时间范围内的历史数据", "suggestions": [ "检查测点名称是否正确", "确认时间范围内有数据", "验证数据源连接是否正常", "检查查询模式和采样间隔设置", ], }, ) # 转换数据格式 data_list = [] if request.point_name in df_data.columns: for timestamp, value in df_data[request.point_name].items(): data_list.append( HistoryDataPoint( timestamp=timestamp.strftime("%Y-%m-%d %H:%M:%S"), value=float(value) if pd.notna(value) else None, ) ) data_count = len(data_list) logger.info( f"单测点历史数据查询完成: 测点={request.point_name}, 数据量={data_count}" ) return SingleHistoryResponse( point_name=request.point_name, data_count=data_count, data=data_list ) except HTTPException: raise except Exception as e: logger.error(f"单测点历史数据查询过程中发生错误: {str(e)}") raise HTTPException( status_code=500, detail={ "error_type": "系统错误", "message": f"历史数据查询失败: {str(e)}", "suggestions": ["检查系统状态", "稍后重试", "联系系统管理员"], }, ) @router.post( "/api/multi_points_history_query", response_model=MultiHistoryResponse, operation_id="multi_points_history_query", tags=["历史数据查询"], ) async def multi_history_query(request: MultiHistoryRequest): """ 查询多个测点的历史数据 **参数说明:** - **point_names**: 测点名称列表,最多支持100个测点 - **start_time**: 开始时间,格式:YYYY-MM-DD HH:MM:SS - **end_time**: 结束时间,格式:YYYY-MM-DD HH:MM:SS - **mode**: 查询模式,支持 avg、max、min、span、raw - **interval**: 采样间隔,如 "1m"、"5m"、"1h" - **fill_method**: 数据合并方式,outer或inner **返回结果:** - total_points: 总测点数量 - successful_points: 成功查询的测点列表 - failed_points: 查询失败的测点列表 - data_count: 时间序列长度 - data: 各测点的历史数据字典 **使用示例:** ```json { "point_names": ["W3.NODE1.XX", "W3.NODE1.YY"], "start_time": "2024-01-01 00:00:00", "end_time": "2024-01-01 23:59:59", "mode": "span", "interval": "5m", "fill_method": "outer" } ``` """ try: # 使用api_select_to_frame获取数据 df_data = opt.api_select_to_frame( point_list=request.point_names, start_time=request.start_time, end_time=request.end_time, interval=request.interval, fill_method=request.fill_method, ) # 分析查询结果 successful_points = [] failed_points = [] for point_name in request.point_names: if ( df_data is not None and not df_data.empty and point_name in df_data.columns ): # 检查该列是否有有效数据 if not df_data[point_name].isna().all(): successful_points.append(point_name) else: failed_points.append(point_name) else: failed_points.append(point_name) # 如果没有任何成功的数据 if len(successful_points) == 0: raise HTTPException( status_code=404, detail={ "error_type": "数据获取失败", "message": f"无法获取任何测点在指定时间范围内的历史数据", "failed_points": failed_points, "suggestions": [ "检查测点名称是否正确", "确认时间范围内有数据", "验证数据源连接是否正常", "检查查询模式和采样间隔设置", ], }, ) # 转换数据格式 data_dict = convert_dataframe_to_dict(df_data, request.point_names) data_count = len(df_data) if df_data is not None else 0 # 获取当前时间戳 query_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") logger.info( f"多测点历史数据查询完成: 总数={len(request.point_names)}, 成功={len(successful_points)}, 失败={len(failed_points)}, 数据量={data_count}" ) return MultiHistoryResponse( successful_points=successful_points, failed_points=failed_points, data_count=data_count, data=data_dict, ) except HTTPException: raise except Exception as e: logger.error(f"多测点历史数据查询过程中发生错误: {str(e)}") raise HTTPException( status_code=500, detail={ "error_type": "系统错误", "message": f"多测点历史数据查询失败: {str(e)}", "suggestions": [ "检查测点名称列表格式", "确认测点数量不超过100个", "检查系统状态", "稍后重试", ], }, )

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/boyzhang666/data-analysys-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server