Skip to main content
Glama

Data Analysis MCP Server

by boyzhang666
point_realtime_query.py8.96 kB
import logging from typing import List, Dict, Any, Optional, Union from fastapi import APIRouter, HTTPException from pydantic import BaseModel, Field, field_validator from config.config import * from routers.utils.openplant import OpenPlant # 设置日志和路由 router = APIRouter() logger = logging.getLogger("point_realtime_query") opt = OpenPlant(host=config_host, port=config_port, timeout=config_timeout) class SingleRealtimeRequest(BaseModel): """单个测点实时值查询请求模型""" point_name: str = Field(..., description="测点名称", example="W3.NODE1.XX") type_: str = Field(default="GN", description="查询类型,默认为GN", example="GN") class MultiRealtimeRequest(BaseModel): """多个测点实时值查询请求模型""" point_names: List[str] = Field( ..., description="测点名称列表", example=["W3.NODE1.XX", "W3.NODE1.YY", "W3.NODE1.ZZ"], ) type_: str = Field(default="GN", description="查询类型,默认为GN", example="GN") @field_validator("point_names") @classmethod def validate_point_names(cls, v): if len(v) == 0: raise ValueError("测点名称列表不能为空") if len(v) > 50: raise ValueError("最多支持50个测点的同时查询") return v class RealtimeDataPoint(BaseModel): """实时数据点模型""" point_name: str = Field(..., description="测点名称") value: Optional[Union[float, int, str]] = Field(..., description="实时值") status: str = Field(..., description="查询状态:success/error") error_message: Optional[str] = Field(None, description="错误信息(如果有)") class SingleRealtimeResponse(BaseModel): """单个测点实时值查询响应模型""" point_name: str = Field(..., description="测点名称") value: Optional[Union[float, int, str]] = Field(..., description="实时值") type_: str = Field(..., description="查询类型") status: str = Field(..., description="查询状态") timestamp: str = Field(..., description="查询时间戳") class MultiRealtimeResponse(BaseModel): """多个测点实时值查询响应模型""" results: List[RealtimeDataPoint] = Field(..., description="查询结果列表") total_count: int = Field(..., description="总查询数量") success_count: int = Field(..., description="成功查询数量") error_count: int = Field(..., description="失败查询数量") type_: str = Field(..., description="查询类型") timestamp: str = Field(..., description="查询时间戳") def query_single_realtime_value(point_name: str, type_: str = "GN") -> Dict[str, Any]: """查询单个测点的实时值""" try: value = opt.api_select_realtime(point_name, type_) return { "point_name": point_name, "value": value, "status": "success", "error_message": None, } except Exception as e: logger.error(f"查询测点 {point_name} 实时值失败: {str(e)}") return { "point_name": point_name, "value": None, "status": "error", "error_message": str(e), } @router.post( "/api/single_point_realtime_query", response_model=SingleRealtimeResponse, operation_id="single_point_realtime_query", tags=["实时值查询"], ) async def single_realtime_query(request: SingleRealtimeRequest): """ 查询单个测点的实时值 **参数说明:** - **point_name**: 测点名称,例如 "W3.NODE1.XX" - **type_**: 查询类型,默认为 "GN" **返回结果:** - point_name: 测点名称 - value: 实时值(可能是数字或字符串) - type_: 查询类型 - status: 查询状态(success/error) - timestamp: 查询时间戳 **使用示例:** ```json { "point_name": "W3.NODE1.XX", "type_": "GN" } ``` **响应示例:** ```json { "point_name": "W3.NODE1.XX", "value": 25.6, "type_": "GN", "status": "success", "timestamp": "2024-01-15 14:30:25" } ``` """ try: from datetime import datetime # 查询实时值 result = query_single_realtime_value(request.point_name, request.type_) # 获取当前时间戳 timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") if result["status"] == "error": raise HTTPException( status_code=500, detail={ "error_type": "实时值查询错误", "message": f"查询测点 {request.point_name} 实时值失败: {result['error_message']}", "suggestions": [ "检查测点名称是否正确", "确认测点是否存在", "检查数据库连接是否正常", ], }, ) return SingleRealtimeResponse( point_name=result["point_name"], value=result["value"], type_=request.type_, status=result["status"], timestamp=timestamp, ) except HTTPException: raise except Exception as e: logger.error(f"单个测点实时值查询过程中发生错误: {str(e)}") raise HTTPException( status_code=500, detail={ "error_type": "系统错误", "message": f"实时值查询失败: {str(e)}", "suggestions": ["检查系统状态", "稍后重试", "联系系统管理员"], }, ) @router.post( "/api/multi_points_realtime_query", response_model=MultiRealtimeResponse, operation_id="multi_points_realtime_query", tags=["实时值查询"], ) async def multi_realtime_query(request: MultiRealtimeRequest): """ 查询多个测点的实时值 **参数说明:** - **point_names**: 测点名称列表,最多支持50个测点 - **type_**: 查询类型,默认为 "GN" **返回结果:** - results: 查询结果列表,包含每个测点的详细信息 - total_count: 总查询数量 - success_count: 成功查询数量 - error_count: 失败查询数量 - type_: 查询类型 - timestamp: 查询时间戳 **使用示例:** ```json { "point_names": ["W3.NODE1.XX", "W3.NODE1.YY", "W3.NODE1.ZZ"], "type_": "GN" } ``` **响应示例:** ```json { "results": [ { "point_name": "W3.NODE1.XX", "value": 25.6, "status": "success", "error_message": null }, { "point_name": "W3.NODE1.YY", "value": 30.2, "status": "success", "error_message": null } ], "total_count": 2, "success_count": 2, "error_count": 0, "type_": "GN", "timestamp": "2024-01-15 14:30:25" } ``` """ try: from datetime import datetime # 批量查询实时值 results = [] success_count = 0 error_count = 0 for point_name in request.point_names: result = query_single_realtime_value(point_name, request.type_) if result["status"] == "success": success_count += 1 else: error_count += 1 results.append( RealtimeDataPoint( point_name=result["point_name"], value=result["value"], status=result["status"], error_message=result["error_message"], ) ) # 获取当前时间戳 timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") logger.info( f"多测点实时值查询完成: 总数={len(request.point_names)}, 成功={success_count}, 失败={error_count}" ) return MultiRealtimeResponse( results=results, total_count=len(request.point_names), success_count=success_count, error_count=error_count, type_=request.type_, timestamp=timestamp, ) except Exception as e: logger.error(f"多测点实时值查询过程中发生错误: {str(e)}") raise HTTPException( status_code=500, detail={ "error_type": "系统错误", "message": f"多测点实时值查询失败: {str(e)}", "suggestions": [ "检查测点名称列表格式", "确认测点数量不超过50个", "检查系统状态", "稍后重试", ], }, )

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/boyzhang666/data-analysys-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server