point_history_query.py•12.2 kB
import logging
import pandas as pd
from typing import List, Dict, Any, Optional, Union
from datetime import datetime
from fastapi import APIRouter, HTTPException, Query
from pydantic import BaseModel, Field, field_validator
from config.config import *
from routers.utils.openplant import OpenPlant
# 设置日志和路由
router = APIRouter()
logger = logging.getLogger("point_history_query")
opt = OpenPlant(host=config_host, port=config_port, timeout=config_timeout)
class SingleHistoryRequest(BaseModel):
"""单个测点历史数据查询请求模型"""
point_name: str = Field(..., description="测点名称", example="W3.NODE1.XX")
start_time: str = Field(
...,
description="开始时间,格式:YYYY-MM-DD HH:MM:SS",
example="2024-01-01 00:00:00",
)
end_time: str = Field(
...,
description="结束时间,格式:YYYY-MM-DD HH:MM:SS",
example="2024-01-01 23:59:59",
)
interval: str = Field(default="1m", description="采样间隔", example="1m")
fill_method: str = Field(
default="outer", description="数据合并方式", example="outer"
)
class MultiHistoryRequest(BaseModel):
"""多个测点历史数据查询请求模型"""
point_names: List[str] = Field(
..., description="测点名称列表", example=["W3.NODE1.XX", "W3.NODE1.YY"]
)
start_time: str = Field(
...,
description="开始时间,格式:YYYY-MM-DD HH:MM:SS",
example="2024-01-01 00:00:00",
)
end_time: str = Field(
...,
description="结束时间,格式:YYYY-MM-DD HH:MM:SS",
example="2024-01-01 23:59:59",
)
interval: str = Field(default="1m", description="采样间隔", example="1m")
fill_method: str = Field(
default="outer", description="数据合并方式", example="outer"
)
class HistoryDataPoint(BaseModel):
"""历史数据点模型"""
timestamp: str = Field(..., description="时间戳")
value: Optional[Union[float, int]] = Field(..., description="数值")
class SingleHistoryResponse(BaseModel):
"""单个测点历史数据查询响应模型"""
point_name: str = Field(..., description="测点名称")
data_count: int = Field(..., description="数据点数量")
data: List[HistoryDataPoint] = Field(..., description="历史数据列表")
class MultiHistoryResponse(BaseModel):
"""多个测点历史数据查询响应模型"""
successful_points: List[str] = Field(..., description="成功查询的测点")
failed_points: List[str] = Field(..., description="查询失败的测点")
data_count: int = Field(..., description="时间序列长度")
data: Dict[str, List[HistoryDataPoint]] = Field(..., description="各测点的历史数据")
class HistoryDataFrameResponse(BaseModel):
"""DataFrame格式的历史数据响应模型"""
successful_points: List[str] = Field(..., description="成功查询的测点")
failed_points: List[str] = Field(..., description="查询失败的测点")
data_count: int = Field(..., description="时间序列长度")
timestamps: List[str] = Field(..., description="时间戳列表")
data_matrix: List[List[Optional[float]]] = Field(
..., description="数据矩阵,每行对应一个时间点,每列对应一个测点"
)
def convert_dataframe_to_dict(
df: pd.DataFrame, point_names: List[str]
) -> Dict[str, List[HistoryDataPoint]]:
"""将DataFrame转换为字典格式的历史数据"""
result = {}
for point_name in point_names:
point_data = []
if point_name in df.columns:
for timestamp, value in df[point_name].items():
point_data.append(
HistoryDataPoint(
timestamp=(
timestamp.strftime("%Y-%m-%d %H:%M:%S")
if pd.notna(timestamp)
else ""
),
value=float(value) if pd.notna(value) else None,
)
)
result[point_name] = point_data
return result
def convert_dataframe_to_matrix(df: pd.DataFrame, point_names: List[str]) -> tuple:
"""将DataFrame转换为矩阵格式"""
timestamps = [ts.strftime("%Y-%m-%d %H:%M:%S") for ts in df.index]
data_matrix = []
for timestamp in df.index:
row = []
for point_name in point_names:
if point_name in df.columns:
value = df.loc[timestamp, point_name]
row.append(float(value) if pd.notna(value) else None)
else:
row.append(None)
data_matrix.append(row)
return timestamps, data_matrix
@router.post(
"/api/single_point_history_query",
response_model=SingleHistoryResponse,
operation_id="single_point_history_query",
tags=["历史数据查询"],
)
async def single_history_query(request: SingleHistoryRequest):
"""
查询单个测点的历史数据
**参数说明:**
- **point_name**: 测点名称,例如 "W3.NODE1.XX"
- **start_time**: 开始时间,格式:YYYY-MM-DD HH:MM:SS
- **end_time**: 结束时间,格式:YYYY-MM-DD HH:MM:SS
- **mode**: 查询模式,支持 avg、max、min、span、raw
- **interval**: 只有span模式下支持采样间隔,如 "1m"、"5m"、"1h"
- **fill_method**: 数据合并方式,如 "outer"、"inner"
**返回结果:**
- point_name: 测点名称
- data_count: 数据点数量
- data: 历史数据列表,包含时间戳和数值
- query_timestamp: 查询时间戳
**使用示例:**
```json
{
"point_name": "W3.NODE1.XX",
"start_time": "2024-01-01 00:00:00",
"end_time": "2024-01-01 23:59:59",
"interval": "1m",
}
```
"""
try:
# 使用api_select_to_frame获取数据
df_data = opt.api_select_to_frame(
point_list=[request.point_name],
start_time=request.start_time,
end_time=request.end_time,
interval=request.interval,
fill_method=request.fill_method,
)
# 验证数据获取
if df_data is None or df_data.empty:
raise HTTPException(
status_code=404,
detail={
"error_type": "数据获取失败",
"message": f"无法获取测点 {request.point_name} 在指定时间范围内的历史数据",
"suggestions": [
"检查测点名称是否正确",
"确认时间范围内有数据",
"验证数据源连接是否正常",
"检查查询模式和采样间隔设置",
],
},
)
# 转换数据格式
data_list = []
if request.point_name in df_data.columns:
for timestamp, value in df_data[request.point_name].items():
data_list.append(
HistoryDataPoint(
timestamp=timestamp.strftime("%Y-%m-%d %H:%M:%S"),
value=float(value) if pd.notna(value) else None,
)
)
data_count = len(data_list)
logger.info(
f"单测点历史数据查询完成: 测点={request.point_name}, 数据量={data_count}"
)
return SingleHistoryResponse(
point_name=request.point_name, data_count=data_count, data=data_list
)
except HTTPException:
raise
except Exception as e:
logger.error(f"单测点历史数据查询过程中发生错误: {str(e)}")
raise HTTPException(
status_code=500,
detail={
"error_type": "系统错误",
"message": f"历史数据查询失败: {str(e)}",
"suggestions": ["检查系统状态", "稍后重试", "联系系统管理员"],
},
)
@router.post(
"/api/multi_points_history_query",
response_model=MultiHistoryResponse,
operation_id="multi_points_history_query",
tags=["历史数据查询"],
)
async def multi_history_query(request: MultiHistoryRequest):
"""
查询多个测点的历史数据
**参数说明:**
- **point_names**: 测点名称列表,最多支持100个测点
- **start_time**: 开始时间,格式:YYYY-MM-DD HH:MM:SS
- **end_time**: 结束时间,格式:YYYY-MM-DD HH:MM:SS
- **mode**: 查询模式,支持 avg、max、min、span、raw
- **interval**: 采样间隔,如 "1m"、"5m"、"1h"
- **fill_method**: 数据合并方式,outer或inner
**返回结果:**
- total_points: 总测点数量
- successful_points: 成功查询的测点列表
- failed_points: 查询失败的测点列表
- data_count: 时间序列长度
- data: 各测点的历史数据字典
**使用示例:**
```json
{
"point_names": ["W3.NODE1.XX", "W3.NODE1.YY"],
"start_time": "2024-01-01 00:00:00",
"end_time": "2024-01-01 23:59:59",
"mode": "span",
"interval": "5m",
"fill_method": "outer"
}
```
"""
try:
# 使用api_select_to_frame获取数据
df_data = opt.api_select_to_frame(
point_list=request.point_names,
start_time=request.start_time,
end_time=request.end_time,
interval=request.interval,
fill_method=request.fill_method,
)
# 分析查询结果
successful_points = []
failed_points = []
for point_name in request.point_names:
if (
df_data is not None
and not df_data.empty
and point_name in df_data.columns
):
# 检查该列是否有有效数据
if not df_data[point_name].isna().all():
successful_points.append(point_name)
else:
failed_points.append(point_name)
else:
failed_points.append(point_name)
# 如果没有任何成功的数据
if len(successful_points) == 0:
raise HTTPException(
status_code=404,
detail={
"error_type": "数据获取失败",
"message": f"无法获取任何测点在指定时间范围内的历史数据",
"failed_points": failed_points,
"suggestions": [
"检查测点名称是否正确",
"确认时间范围内有数据",
"验证数据源连接是否正常",
"检查查询模式和采样间隔设置",
],
},
)
# 转换数据格式
data_dict = convert_dataframe_to_dict(df_data, request.point_names)
data_count = len(df_data) if df_data is not None else 0
# 获取当前时间戳
query_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
logger.info(
f"多测点历史数据查询完成: 总数={len(request.point_names)}, 成功={len(successful_points)}, 失败={len(failed_points)}, 数据量={data_count}"
)
return MultiHistoryResponse(
successful_points=successful_points,
failed_points=failed_points,
data_count=data_count,
data=data_dict,
)
except HTTPException:
raise
except Exception as e:
logger.error(f"多测点历史数据查询过程中发生错误: {str(e)}")
raise HTTPException(
status_code=500,
detail={
"error_type": "系统错误",
"message": f"多测点历史数据查询失败: {str(e)}",
"suggestions": [
"检查测点名称列表格式",
"确认测点数量不超过100个",
"检查系统状态",
"稍后重试",
],
},
)