point_realtime_query.py•8.96 kB
import logging
from typing import List, Dict, Any, Optional, Union
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel, Field, field_validator
from config.config import *
from routers.utils.openplant import OpenPlant
# 设置日志和路由
router = APIRouter()
logger = logging.getLogger("point_realtime_query")
opt = OpenPlant(host=config_host, port=config_port, timeout=config_timeout)
class SingleRealtimeRequest(BaseModel):
"""单个测点实时值查询请求模型"""
point_name: str = Field(..., description="测点名称", example="W3.NODE1.XX")
type_: str = Field(default="GN", description="查询类型,默认为GN", example="GN")
class MultiRealtimeRequest(BaseModel):
"""多个测点实时值查询请求模型"""
point_names: List[str] = Field(
...,
description="测点名称列表",
example=["W3.NODE1.XX", "W3.NODE1.YY", "W3.NODE1.ZZ"],
)
type_: str = Field(default="GN", description="查询类型,默认为GN", example="GN")
@field_validator("point_names")
@classmethod
def validate_point_names(cls, v):
if len(v) == 0:
raise ValueError("测点名称列表不能为空")
if len(v) > 50:
raise ValueError("最多支持50个测点的同时查询")
return v
class RealtimeDataPoint(BaseModel):
"""实时数据点模型"""
point_name: str = Field(..., description="测点名称")
value: Optional[Union[float, int, str]] = Field(..., description="实时值")
status: str = Field(..., description="查询状态:success/error")
error_message: Optional[str] = Field(None, description="错误信息(如果有)")
class SingleRealtimeResponse(BaseModel):
"""单个测点实时值查询响应模型"""
point_name: str = Field(..., description="测点名称")
value: Optional[Union[float, int, str]] = Field(..., description="实时值")
type_: str = Field(..., description="查询类型")
status: str = Field(..., description="查询状态")
timestamp: str = Field(..., description="查询时间戳")
class MultiRealtimeResponse(BaseModel):
"""多个测点实时值查询响应模型"""
results: List[RealtimeDataPoint] = Field(..., description="查询结果列表")
total_count: int = Field(..., description="总查询数量")
success_count: int = Field(..., description="成功查询数量")
error_count: int = Field(..., description="失败查询数量")
type_: str = Field(..., description="查询类型")
timestamp: str = Field(..., description="查询时间戳")
def query_single_realtime_value(point_name: str, type_: str = "GN") -> Dict[str, Any]:
"""查询单个测点的实时值"""
try:
value = opt.api_select_realtime(point_name, type_)
return {
"point_name": point_name,
"value": value,
"status": "success",
"error_message": None,
}
except Exception as e:
logger.error(f"查询测点 {point_name} 实时值失败: {str(e)}")
return {
"point_name": point_name,
"value": None,
"status": "error",
"error_message": str(e),
}
@router.post(
"/api/single_point_realtime_query",
response_model=SingleRealtimeResponse,
operation_id="single_point_realtime_query",
tags=["实时值查询"],
)
async def single_realtime_query(request: SingleRealtimeRequest):
"""
查询单个测点的实时值
**参数说明:**
- **point_name**: 测点名称,例如 "W3.NODE1.XX"
- **type_**: 查询类型,默认为 "GN"
**返回结果:**
- point_name: 测点名称
- value: 实时值(可能是数字或字符串)
- type_: 查询类型
- status: 查询状态(success/error)
- timestamp: 查询时间戳
**使用示例:**
```json
{
"point_name": "W3.NODE1.XX",
"type_": "GN"
}
```
**响应示例:**
```json
{
"point_name": "W3.NODE1.XX",
"value": 25.6,
"type_": "GN",
"status": "success",
"timestamp": "2024-01-15 14:30:25"
}
```
"""
try:
from datetime import datetime
# 查询实时值
result = query_single_realtime_value(request.point_name, request.type_)
# 获取当前时间戳
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
if result["status"] == "error":
raise HTTPException(
status_code=500,
detail={
"error_type": "实时值查询错误",
"message": f"查询测点 {request.point_name} 实时值失败: {result['error_message']}",
"suggestions": [
"检查测点名称是否正确",
"确认测点是否存在",
"检查数据库连接是否正常",
],
},
)
return SingleRealtimeResponse(
point_name=result["point_name"],
value=result["value"],
type_=request.type_,
status=result["status"],
timestamp=timestamp,
)
except HTTPException:
raise
except Exception as e:
logger.error(f"单个测点实时值查询过程中发生错误: {str(e)}")
raise HTTPException(
status_code=500,
detail={
"error_type": "系统错误",
"message": f"实时值查询失败: {str(e)}",
"suggestions": ["检查系统状态", "稍后重试", "联系系统管理员"],
},
)
@router.post(
"/api/multi_points_realtime_query",
response_model=MultiRealtimeResponse,
operation_id="multi_points_realtime_query",
tags=["实时值查询"],
)
async def multi_realtime_query(request: MultiRealtimeRequest):
"""
查询多个测点的实时值
**参数说明:**
- **point_names**: 测点名称列表,最多支持50个测点
- **type_**: 查询类型,默认为 "GN"
**返回结果:**
- results: 查询结果列表,包含每个测点的详细信息
- total_count: 总查询数量
- success_count: 成功查询数量
- error_count: 失败查询数量
- type_: 查询类型
- timestamp: 查询时间戳
**使用示例:**
```json
{
"point_names": ["W3.NODE1.XX", "W3.NODE1.YY", "W3.NODE1.ZZ"],
"type_": "GN"
}
```
**响应示例:**
```json
{
"results": [
{
"point_name": "W3.NODE1.XX",
"value": 25.6,
"status": "success",
"error_message": null
},
{
"point_name": "W3.NODE1.YY",
"value": 30.2,
"status": "success",
"error_message": null
}
],
"total_count": 2,
"success_count": 2,
"error_count": 0,
"type_": "GN",
"timestamp": "2024-01-15 14:30:25"
}
```
"""
try:
from datetime import datetime
# 批量查询实时值
results = []
success_count = 0
error_count = 0
for point_name in request.point_names:
result = query_single_realtime_value(point_name, request.type_)
if result["status"] == "success":
success_count += 1
else:
error_count += 1
results.append(
RealtimeDataPoint(
point_name=result["point_name"],
value=result["value"],
status=result["status"],
error_message=result["error_message"],
)
)
# 获取当前时间戳
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
logger.info(
f"多测点实时值查询完成: 总数={len(request.point_names)}, 成功={success_count}, 失败={error_count}"
)
return MultiRealtimeResponse(
results=results,
total_count=len(request.point_names),
success_count=success_count,
error_count=error_count,
type_=request.type_,
timestamp=timestamp,
)
except Exception as e:
logger.error(f"多测点实时值查询过程中发生错误: {str(e)}")
raise HTTPException(
status_code=500,
detail={
"error_type": "系统错误",
"message": f"多测点实时值查询失败: {str(e)}",
"suggestions": [
"检查测点名称列表格式",
"确认测点数量不超过50个",
"检查系统状态",
"稍后重试",
],
},
)