Skip to main content
Glama

Data Analysis MCP Server

by boyzhang666
openplant.py20.9 kB
from .scr.OPAPI_36 import * import pandas as pd from typing import List class OpenPlant: """ - 适用于麦杰实时数据库 - 对操作语句的二次封装 - 支持执行sql语句的增删改查 - 支持高级封装语法的增删改查 """ def __init__( self, host: int, port: int, user="sis", timeout=60, password="openplant" ): self.host = host self.port = port self.user = user self.timeout = timeout self.password = password # 执行 sql 语句 def api_sql(self, sql_code: str): """ ### OPIO(SQL增删改查) """ con = Connect(self.host, self.port, self.timeout, self.user, self.password) resultSet = con.executeQuery(sql_code) try: list = [] while resultSet.Next(): # Next()执行一次, 游标下移一行 colNum = resultSet.columnsNum # 获取列个数 dict = {} for i in range(colNum): colName = resultSet.columnLabel(i) # 获取第i列名字 colValue = resultSet.getValue(i) # 获取第i列值 dict.update({colName: colValue}) list.append(dict) except Exception as e: print("error:", e) finally: resultSet.close() con.close() return list # 查看实时值 def api_select_realtime(self, Name: str, type_="GN"): """ ### OPIO(查看实时值) """ # 建立连接 con = Connect(self.host, self.port, self.timeout, self.user, self.password) resultSet = con.executeQuery( "select av from Realtime where " + type_ + " = " + "'" + Name + "'" ) colValue = None # 初始化变量 try: while resultSet.Next(): # Next()执行一次, 游标下移一行 colNum = resultSet.columnsNum # 获取列个数 for i in range(colNum): colValue = resultSet.getValue(i) # 获取第i列值 except Exception as e: print("error:", e) finally: resultSet.close() con.close() return colValue # 查询 def api_select( self, TableName, ColNames, Keys, LastHours: int = 3, Mode: str = "arch", Interval: int = 1, ): """ ### OPIO(查询) ### 参数 - TableName: 'Point' or 'REALTIME' or 'ARCHIVE' - ColNames: ['GN', 'TM', 'AV'] - Keys: ['DEFAULT.NODE1.X7'] """ con = Connect(self.host, self.port, self.timeout, self.user, self.password) if TableName in ["POINT", "Point", "point", "REALTIME", "Realtime"]: resultSet = con.select(TableName, ColNames, Keys, None) try: list = [] while resultSet.Next(): # Next()执行一次, 游标下移一行 colNum = resultSet.columnsNum # 获取列个数 dict = {} for i in range(colNum): colName = resultSet.columnLabel(i) # 获取第i列名字 colValue = resultSet.getValue(i) # 获取第i列值 dict.update({colName: colValue}) list.append(dict) except Exception as e: print("error:", e) finally: resultSet.close() elif TableName in ["ARCHIVE", "Archive"]: end = datetime.datetime.now() begin = end + datetime.timedelta(hours=-LastHours) options = { "end": end, "begin": begin, "mode": Mode, "interval": Interval, "qtype": 0, } resultSet = con.select(TableName, ColNames, Keys, options) try: list = [] while resultSet.Next(): # Next()执行一次, 游标下移一行 colNum = resultSet.columnsNum # 获取列个数 dict = {} for i in range(colNum): colName = resultSet.columnLabel(i) # 获取第i列名字 colValue = resultSet.getValue(i) # 获取第i列值 dict.update({colName: colValue}) list.append(dict) except Exception as e: print("error:", e) finally: resultSet.close() else: raise ValueError("TableName must be true") con.close() return list # 选择历史数据 def api_select_archive( self, global_name: str, mode: str = "avg", interval=None, length: int = 1 ) -> list[dict]: """选择点的历史平均值、流量值、最大值、最小值 Args: global_name (str): _description_ mode (str, optional): "flow", "max", "min", "span", "raw". Defaults to "avg". interval (_type_, optional): "1m", "1s",. Defaults to None. length (int, optional): _description_. Defaults to 3. Returns: list[dict]: _description_ """ end_time = self.api_sql( f" select TM from Realtime where gn = '{global_name}' " )[0]["TM"] start_time = end_time - datetime.timedelta(minutes=length) end_time = end_time.strftime("%Y-%m-%d %H:%M:%S") start_time = start_time.strftime("%Y-%m-%d %H:%M:%S") if interval == None: result = self.api_sql( f" select TM, AV from Archive where gn = '{global_name}' and mode = '{mode}' and TM between '{start_time}' and '{end_time}' " ) else: result = self.api_sql( f" select TM, AV from Archive where gn = '{global_name}' and mode = '{mode}' and interval = '{interval}' and TM between '{start_time}' and '{end_time}' " ) return result def api_select_to_frame( self, point_list: List[str], start_time: str, end_time: str, mode: str = "span", interval: str = "3m", fill_method: str = "outer", ) -> pd.DataFrame: """ 从API获取时间序列数据, 并将结果合并为DataFrame. ### 参数: - point_list: List[str], 要查询的数据点列表. - start_time: str, 查询的开始时间, 格式为'YYYY-MM-DD HH:MM:SS'. - end_time: str, 查询的结束时间, 格式为'YYYY-MM-DD HH:MM:SS'. - mode: str, 查询模式. - interval: str, 数据点采样间隔, 如'3m'表示每3分钟. - fill_method: str, 数据合并方式,'outer'保留所有时间点,'inner'只保留共同时间点. ### 返回: - pd.DataFrame, 包含所有数据点时间序列的合并结果, 时间戳列名为'TM'. ### 异常处理: - 当某个点无数据时,该点列将填充为NaN - 当所有点都无数据时,返回空DataFrame但包含正确的列名 """ # 存储每个点的数据 point_data_dict = {} successful_points = [] failed_points = [] # 逐个查询每个点的数据 for point_name in point_list: try: # 构造SQL查询字符串 sql_code = f"SELECT AV, TM FROM Archive WHERE mode='{mode}' AND interval='{interval}' AND GN = '{point_name}' AND TM BETWEEN '{start_time}' AND '{end_time}'" query_result = self.api_sql(sql_code) # 检查查询结果 if not query_result: failed_points.append(point_name) point_data_dict[point_name] = pd.Series( dtype=float, name=point_name ) continue # 处理有数据的情况 data = pd.DataFrame(query_result) # 处理时间列 if not pd.api.types.is_datetime64_any_dtype(data["TM"]): data["TM"] = pd.to_datetime(data["TM"]) # 处理重复时间戳 if data["TM"].duplicated().any(): data = data.drop_duplicates(subset=["TM"], keep="last") # 设置时间索引并重命名列 data.set_index("TM", inplace=True) data.rename(columns={"AV": point_name}, inplace=True) # 确保数值列为数值类型 data[point_name] = pd.to_numeric(data[point_name], errors="coerce") point_data_dict[point_name] = data[point_name] successful_points.append(point_name) except Exception as e: failed_points.append(point_name) point_data_dict[point_name] = pd.Series(dtype=float, name=point_name) # 输出统计信息和处理无数据情况 if len(successful_points) == 0: print( f"警告:所有 {len(point_list)} 个点在时间范围 {start_time} 到 {end_time} 内都无数据" ) return pd.DataFrame(columns=point_list, index=pd.Index([], name="TM")) else: print(f"成功获取数据的点: {len(successful_points)} 个") if failed_points: print( f"无数据或查询失败的点: {len(failed_points)} 个 - {failed_points}" ) # 合并数据 try: join_type = "inner" if fill_method.lower() == "inner" else "outer" finally_data = pd.concat(point_data_dict.values(), axis=1, join=join_type) # 确保列的顺序与输入一致,按时间排序,设置索引名称 finally_data = finally_data.reindex(columns=point_list).sort_index() finally_data.index.name = "TM" except Exception as e: print(f"数据合并时发生错误: {e}") finally_data = pd.DataFrame( columns=point_list, index=pd.Index([], name="TM") ) return finally_data def api_search_points_by_description( self, description_keyword: str, search_fields: List[str] = ["ED", "GN"], limit: int = None, exact_match: bool = False, ) -> List[dict]: """ 通过描述关键词搜索相关的点名 ### 参数: - description_keyword: str, 搜索的关键词 - search_fields: List[str], 要搜索的字段列表,默认为["ED", "GN"],ED为点描述,GN为点名 - limit: int, 限制返回结果数量,None表示不限制 - exact_match: bool, 是否精确匹配,False表示模糊匹配 ### 返回: - List[dict], 包含匹配的点信息列表,每个字典包含GN、ED等字段 ### 示例: ```python # 模糊搜索包含"温度"的点 points = openplant.api_search_points_by_description("温度") # 精确匹配点名 points = openplant.api_search_points_by_description("DEFAULT.NODE1.X7", exact_match=True) # 只搜索点描述字段,限制返回10条 points = openplant.api_search_points_by_description("压力", search_fields=["ED"], limit=10) ``` """ try: # 构建WHERE条件 where_conditions = [] if exact_match: # 精确匹配 for field in search_fields: where_conditions.append(f"{field} = '{description_keyword}'") else: # 模糊匹配 for field in search_fields: where_conditions.append(f"{field} LIKE '%{description_keyword}%'") # 用OR连接多个字段的搜索条件 where_clause = " OR ".join(where_conditions) # 构建SQL查询语句 sql_code = f"SELECT GN, ED, RT, ID FROM Point WHERE {where_clause}" # 添加排序,优先显示点名匹配的结果 sql_code += ( " ORDER BY CASE WHEN GN LIKE '%{0}%' THEN 1 ELSE 2 END, GN".format( description_keyword ) ) # 添加限制条件 if limit is not None and limit > 0: sql_code += f" LIMIT {limit}" # 执行查询 result = self.api_sql(sql_code) return result except Exception as e: print(f"搜索点名时发生错误: {e}") return [] def api_get_all_points_info(self, limit: int = None) -> List[dict]: """ 获取所有点的基本信息 ### 参数: - limit: int, 限制返回结果数量,None表示不限制 ### 返回: - List[dict], 包含所有点信息的列表 """ try: sql_code = "SELECT GN, ED, RT, ID FROM Point" if limit is not None and limit > 0: sql_code += f" LIMIT {limit}" result = self.api_sql(sql_code) return result except Exception as e: print(f"获取点信息时发生错误: {e}") return [] def api_search_points_advanced( self, filters: dict, logic_operator: str = "AND", limit: int = None ) -> List[dict]: """ 高级点搜索功能,支持多条件组合查询 ### 参数: - filters: dict, 搜索条件字典,格式为 {字段名: {操作符: 值}} - logic_operator: str, 多条件间的逻辑操作符,"AND" 或 "OR" - limit: int, 限制返回结果数量 ### 支持的操作符: - "like": 模糊匹配 - "eq": 精确匹配 - "ne": 不等于 - "in": 包含在列表中 ### 示例: ```python # 搜索描述包含"温度"且点名包含"NODE1"的点 filters = { "ED": {"like": "温度"}, "GN": {"like": "NODE1"} } points = openplant.api_search_points_advanced(filters, "AND") # 搜索特定ID列表的点 filters = { "ID": {"in": [1001, 1002, 1003]} } points = openplant.api_search_points_advanced(filters) ``` """ try: where_conditions = [] for field, condition in filters.items(): for operator, value in condition.items(): if operator == "like": where_conditions.append(f"{field} LIKE '%{value}%'") elif operator == "eq": where_conditions.append(f"{field} = '{value}'") elif operator == "ne": where_conditions.append(f"{field} != '{value}'") elif operator == "in": if isinstance(value, (list, tuple)): value_str = "', '".join(map(str, value)) where_conditions.append(f"{field} IN ('{value_str}')") else: where_conditions.append(f"{field} = '{value}'") if not where_conditions: return self.api_get_all_points_info(limit) # 用指定的逻辑操作符连接条件 where_clause = f" {logic_operator} ".join(where_conditions) sql_code = f"SELECT GN, ED, RT, ID FROM Point WHERE {where_clause}" if limit is not None and limit > 0: sql_code += f" LIMIT {limit}" result = self.api_sql(sql_code) return result except Exception as e: print(f"高级搜索时发生错误: {e}") return [] # 写入实时值 def api_write_realtime(self, ID: int, GN: str, Value, RealTime=None): """ ### OPIO(写入实时值) """ if RealTime is None: time_now = datetime.datetime.now() RealTime = datetime.datetime.strftime(time_now, "'%Y-%m-%d %H:%M:%S'") else: RealTime = RealTime WriteCode = "insert into Realtime values " + str((ID, GN, RealTime, 0, Value)) con = Connect(self.host, self.port, self.timeout, self.user, self.password) resultSet = con.executeQuery(WriteCode) resultSet.close() con.close() # 新增点、插入值 def api_insert(self, TableName: str, ColNames: list, Rows: list): """ ### OPIO(新增点、插入值(必须有点)) ### 参数 - TableName: 'POINT' or 'REALTIME' - ColNames: 列集合 = ['GN', 'RT', 'ED'] - Rows: 对应ColNames的多个点的嵌套列表 """ con = Connect(self.host, self.port, self.timeout, self.user, self.password) resultSet = con.insert(TableName, ColNames, Rows) # 执行插入 try: if not resultSet.isHaveWall(): while resultSet.Next(): colNum = resultSet.columnsNum for i in range(colNum): colN = resultSet.columnLabel(i) colV = resultSet.getValue(i) print(colN, ":", colV) else: print("穿透隔离器成功") except Exception as e: print("error:", e) finally: resultSet.close() con.close() # 删除点 def api_delete(self, TableName: str, ColNames: list, Keys: list): """ ### OPIO(删除点) ### 参数 - TableName: "Point", - ColNames: ["GN"] - Keys: ['DEFAULT.NODE1.X7'] """ con = Connect(self.host, self.port, self.timeout, self.user, self.password) resultSet = con.delete(TableName, ColNames, Keys) try: while resultSet.Next(): colNum = resultSet.columnsNum for i in range(colNum): colN = resultSet.columnLabel(i) colV = resultSet.getValue(i) print(colN, ":", colV) except Exception as e: print("error:", e) finally: resultSet.close() con.close() # 更新 def api_update(self, TableName: str, ColNames: list, Rows: list): """ ### OPIO(更新) ### 参数 - TableName: "POINT" or "REALTIME" - ColNames: ["GN", "ED", "AV"] - Rows: 多个点对应ColNames的嵌套列表, [["DEFAULT.NODE1.X7", "test", 3]] """ con = Connect(self.host, self.port, self.timeout, self.user, self.password) resultSet = con.update(TableName, ColNames, Rows) try: if not resultSet.isHaveWall(): while resultSet.Next(): colNum = resultSet.columnsNum for i in range(colNum): colN = resultSet.columnLabel(i) colV = resultSet.getValue(i) # print(colN, ':', colV) else: print("穿透隔离器成功") except Exception as e: print("error:", e) finally: resultSet.close() con.close() def __onCallback(self, owner, response): "回调函数, 订阅信息推送会调用该函数" if response != None: io = IO() result = io.get_table(response) error = io.get_errno(response) if error == 0: resultSet = ResultSet(None, None, response) colCount = resultSet.columnCount() rowCount = resultSet.rowCount() for j in range(rowCount): io.set_rowid(result, j) for i in range(colCount): colLab = resultSet.columnLabel(i) colValue = resultSet.getValue(i) print("cloLab:", colLab, "colValue:", colValue) resultSet.close() def __asyncSubscribe(self, ahObject, GNs: list): "动态订阅" time.sleep(60) ahObject.add(GNs) # ahObject.remove(GNs) # 异步订阅 def api_async_(self, tableName, IDs, GNs): """ ### OPIO(异步订阅) ### 参数 - TableName = 'Realtime' # 订阅实时表 - IDs = [84633408] - GNs = ['DEFAULT.NODE1.X7'] """ import _thread global cb global cbRealtime cb = CFUNCTYPE(None, c_void_p, c_void_p) cbRealtime = cb(self.__onCallback) con = Connect(self.host, self.port, self.timeout, self.user, self.password) asyncHandle = con.openAsync(tableName, cbRealtime, IDs) # 开始订阅 _thread.start_new_thread(self.__asyncSubscribe, (asyncHandle, GNs)) con.close() # 关闭连接

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/boyzhang666/data-analysys-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server