openplant.py•20.9 kB
from .scr.OPAPI_36 import *
import pandas as pd
from typing import List
class OpenPlant:
"""
- 适用于麦杰实时数据库
- 对操作语句的二次封装
- 支持执行sql语句的增删改查
- 支持高级封装语法的增删改查
"""
def __init__(
self, host: int, port: int, user="sis", timeout=60, password="openplant"
):
self.host = host
self.port = port
self.user = user
self.timeout = timeout
self.password = password
# 执行 sql 语句
def api_sql(self, sql_code: str):
"""
### OPIO(SQL增删改查)
"""
con = Connect(self.host, self.port, self.timeout, self.user, self.password)
resultSet = con.executeQuery(sql_code)
try:
list = []
while resultSet.Next(): # Next()执行一次, 游标下移一行
colNum = resultSet.columnsNum # 获取列个数
dict = {}
for i in range(colNum):
colName = resultSet.columnLabel(i) # 获取第i列名字
colValue = resultSet.getValue(i) # 获取第i列值
dict.update({colName: colValue})
list.append(dict)
except Exception as e:
print("error:", e)
finally:
resultSet.close()
con.close()
return list
# 查看实时值
def api_select_realtime(self, Name: str, type_="GN"):
"""
### OPIO(查看实时值)
"""
# 建立连接
con = Connect(self.host, self.port, self.timeout, self.user, self.password)
resultSet = con.executeQuery(
"select av from Realtime where " + type_ + " = " + "'" + Name + "'"
)
colValue = None # 初始化变量
try:
while resultSet.Next(): # Next()执行一次, 游标下移一行
colNum = resultSet.columnsNum # 获取列个数
for i in range(colNum):
colValue = resultSet.getValue(i) # 获取第i列值
except Exception as e:
print("error:", e)
finally:
resultSet.close()
con.close()
return colValue
# 查询
def api_select(
self,
TableName,
ColNames,
Keys,
LastHours: int = 3,
Mode: str = "arch",
Interval: int = 1,
):
"""
### OPIO(查询)
### 参数
- TableName: 'Point' or 'REALTIME' or 'ARCHIVE'
- ColNames: ['GN', 'TM', 'AV']
- Keys: ['DEFAULT.NODE1.X7']
"""
con = Connect(self.host, self.port, self.timeout, self.user, self.password)
if TableName in ["POINT", "Point", "point", "REALTIME", "Realtime"]:
resultSet = con.select(TableName, ColNames, Keys, None)
try:
list = []
while resultSet.Next(): # Next()执行一次, 游标下移一行
colNum = resultSet.columnsNum # 获取列个数
dict = {}
for i in range(colNum):
colName = resultSet.columnLabel(i) # 获取第i列名字
colValue = resultSet.getValue(i) # 获取第i列值
dict.update({colName: colValue})
list.append(dict)
except Exception as e:
print("error:", e)
finally:
resultSet.close()
elif TableName in ["ARCHIVE", "Archive"]:
end = datetime.datetime.now()
begin = end + datetime.timedelta(hours=-LastHours)
options = {
"end": end,
"begin": begin,
"mode": Mode,
"interval": Interval,
"qtype": 0,
}
resultSet = con.select(TableName, ColNames, Keys, options)
try:
list = []
while resultSet.Next(): # Next()执行一次, 游标下移一行
colNum = resultSet.columnsNum # 获取列个数
dict = {}
for i in range(colNum):
colName = resultSet.columnLabel(i) # 获取第i列名字
colValue = resultSet.getValue(i) # 获取第i列值
dict.update({colName: colValue})
list.append(dict)
except Exception as e:
print("error:", e)
finally:
resultSet.close()
else:
raise ValueError("TableName must be true")
con.close()
return list
# 选择历史数据
def api_select_archive(
self, global_name: str, mode: str = "avg", interval=None, length: int = 1
) -> list[dict]:
"""选择点的历史平均值、流量值、最大值、最小值
Args:
global_name (str): _description_
mode (str, optional): "flow", "max", "min", "span", "raw". Defaults to "avg".
interval (_type_, optional): "1m", "1s",. Defaults to None.
length (int, optional): _description_. Defaults to 3.
Returns:
list[dict]: _description_
"""
end_time = self.api_sql(
f" select TM from Realtime where gn = '{global_name}' "
)[0]["TM"]
start_time = end_time - datetime.timedelta(minutes=length)
end_time = end_time.strftime("%Y-%m-%d %H:%M:%S")
start_time = start_time.strftime("%Y-%m-%d %H:%M:%S")
if interval == None:
result = self.api_sql(
f" select TM, AV from Archive where gn = '{global_name}' and mode = '{mode}' and TM between '{start_time}' and '{end_time}' "
)
else:
result = self.api_sql(
f" select TM, AV from Archive where gn = '{global_name}' and mode = '{mode}' and interval = '{interval}' and TM between '{start_time}' and '{end_time}' "
)
return result
def api_select_to_frame(
self,
point_list: List[str],
start_time: str,
end_time: str,
mode: str = "span",
interval: str = "3m",
fill_method: str = "outer",
) -> pd.DataFrame:
"""
从API获取时间序列数据, 并将结果合并为DataFrame.
### 参数:
- point_list: List[str], 要查询的数据点列表.
- start_time: str, 查询的开始时间, 格式为'YYYY-MM-DD HH:MM:SS'.
- end_time: str, 查询的结束时间, 格式为'YYYY-MM-DD HH:MM:SS'.
- mode: str, 查询模式.
- interval: str, 数据点采样间隔, 如'3m'表示每3分钟.
- fill_method: str, 数据合并方式,'outer'保留所有时间点,'inner'只保留共同时间点.
### 返回:
- pd.DataFrame, 包含所有数据点时间序列的合并结果, 时间戳列名为'TM'.
### 异常处理:
- 当某个点无数据时,该点列将填充为NaN
- 当所有点都无数据时,返回空DataFrame但包含正确的列名
"""
# 存储每个点的数据
point_data_dict = {}
successful_points = []
failed_points = []
# 逐个查询每个点的数据
for point_name in point_list:
try:
# 构造SQL查询字符串
sql_code = f"SELECT AV, TM FROM Archive WHERE mode='{mode}' AND interval='{interval}' AND GN = '{point_name}' AND TM BETWEEN '{start_time}' AND '{end_time}'"
query_result = self.api_sql(sql_code)
# 检查查询结果
if not query_result:
failed_points.append(point_name)
point_data_dict[point_name] = pd.Series(
dtype=float, name=point_name
)
continue
# 处理有数据的情况
data = pd.DataFrame(query_result)
# 处理时间列
if not pd.api.types.is_datetime64_any_dtype(data["TM"]):
data["TM"] = pd.to_datetime(data["TM"])
# 处理重复时间戳
if data["TM"].duplicated().any():
data = data.drop_duplicates(subset=["TM"], keep="last")
# 设置时间索引并重命名列
data.set_index("TM", inplace=True)
data.rename(columns={"AV": point_name}, inplace=True)
# 确保数值列为数值类型
data[point_name] = pd.to_numeric(data[point_name], errors="coerce")
point_data_dict[point_name] = data[point_name]
successful_points.append(point_name)
except Exception as e:
failed_points.append(point_name)
point_data_dict[point_name] = pd.Series(dtype=float, name=point_name)
# 输出统计信息和处理无数据情况
if len(successful_points) == 0:
print(
f"警告:所有 {len(point_list)} 个点在时间范围 {start_time} 到 {end_time} 内都无数据"
)
return pd.DataFrame(columns=point_list, index=pd.Index([], name="TM"))
else:
print(f"成功获取数据的点: {len(successful_points)} 个")
if failed_points:
print(
f"无数据或查询失败的点: {len(failed_points)} 个 - {failed_points}"
)
# 合并数据
try:
join_type = "inner" if fill_method.lower() == "inner" else "outer"
finally_data = pd.concat(point_data_dict.values(), axis=1, join=join_type)
# 确保列的顺序与输入一致,按时间排序,设置索引名称
finally_data = finally_data.reindex(columns=point_list).sort_index()
finally_data.index.name = "TM"
except Exception as e:
print(f"数据合并时发生错误: {e}")
finally_data = pd.DataFrame(
columns=point_list, index=pd.Index([], name="TM")
)
return finally_data
def api_search_points_by_description(
self,
description_keyword: str,
search_fields: List[str] = ["ED", "GN"],
limit: int = None,
exact_match: bool = False,
) -> List[dict]:
"""
通过描述关键词搜索相关的点名
### 参数:
- description_keyword: str, 搜索的关键词
- search_fields: List[str], 要搜索的字段列表,默认为["ED", "GN"],ED为点描述,GN为点名
- limit: int, 限制返回结果数量,None表示不限制
- exact_match: bool, 是否精确匹配,False表示模糊匹配
### 返回:
- List[dict], 包含匹配的点信息列表,每个字典包含GN、ED等字段
### 示例:
```python
# 模糊搜索包含"温度"的点
points = openplant.api_search_points_by_description("温度")
# 精确匹配点名
points = openplant.api_search_points_by_description("DEFAULT.NODE1.X7", exact_match=True)
# 只搜索点描述字段,限制返回10条
points = openplant.api_search_points_by_description("压力", search_fields=["ED"], limit=10)
```
"""
try:
# 构建WHERE条件
where_conditions = []
if exact_match:
# 精确匹配
for field in search_fields:
where_conditions.append(f"{field} = '{description_keyword}'")
else:
# 模糊匹配
for field in search_fields:
where_conditions.append(f"{field} LIKE '%{description_keyword}%'")
# 用OR连接多个字段的搜索条件
where_clause = " OR ".join(where_conditions)
# 构建SQL查询语句
sql_code = f"SELECT GN, ED, RT, ID FROM Point WHERE {where_clause}"
# 添加排序,优先显示点名匹配的结果
sql_code += (
" ORDER BY CASE WHEN GN LIKE '%{0}%' THEN 1 ELSE 2 END, GN".format(
description_keyword
)
)
# 添加限制条件
if limit is not None and limit > 0:
sql_code += f" LIMIT {limit}"
# 执行查询
result = self.api_sql(sql_code)
return result
except Exception as e:
print(f"搜索点名时发生错误: {e}")
return []
def api_get_all_points_info(self, limit: int = None) -> List[dict]:
"""
获取所有点的基本信息
### 参数:
- limit: int, 限制返回结果数量,None表示不限制
### 返回:
- List[dict], 包含所有点信息的列表
"""
try:
sql_code = "SELECT GN, ED, RT, ID FROM Point"
if limit is not None and limit > 0:
sql_code += f" LIMIT {limit}"
result = self.api_sql(sql_code)
return result
except Exception as e:
print(f"获取点信息时发生错误: {e}")
return []
def api_search_points_advanced(
self, filters: dict, logic_operator: str = "AND", limit: int = None
) -> List[dict]:
"""
高级点搜索功能,支持多条件组合查询
### 参数:
- filters: dict, 搜索条件字典,格式为 {字段名: {操作符: 值}}
- logic_operator: str, 多条件间的逻辑操作符,"AND" 或 "OR"
- limit: int, 限制返回结果数量
### 支持的操作符:
- "like": 模糊匹配
- "eq": 精确匹配
- "ne": 不等于
- "in": 包含在列表中
### 示例:
```python
# 搜索描述包含"温度"且点名包含"NODE1"的点
filters = {
"ED": {"like": "温度"},
"GN": {"like": "NODE1"}
}
points = openplant.api_search_points_advanced(filters, "AND")
# 搜索特定ID列表的点
filters = {
"ID": {"in": [1001, 1002, 1003]}
}
points = openplant.api_search_points_advanced(filters)
```
"""
try:
where_conditions = []
for field, condition in filters.items():
for operator, value in condition.items():
if operator == "like":
where_conditions.append(f"{field} LIKE '%{value}%'")
elif operator == "eq":
where_conditions.append(f"{field} = '{value}'")
elif operator == "ne":
where_conditions.append(f"{field} != '{value}'")
elif operator == "in":
if isinstance(value, (list, tuple)):
value_str = "', '".join(map(str, value))
where_conditions.append(f"{field} IN ('{value_str}')")
else:
where_conditions.append(f"{field} = '{value}'")
if not where_conditions:
return self.api_get_all_points_info(limit)
# 用指定的逻辑操作符连接条件
where_clause = f" {logic_operator} ".join(where_conditions)
sql_code = f"SELECT GN, ED, RT, ID FROM Point WHERE {where_clause}"
if limit is not None and limit > 0:
sql_code += f" LIMIT {limit}"
result = self.api_sql(sql_code)
return result
except Exception as e:
print(f"高级搜索时发生错误: {e}")
return []
# 写入实时值
def api_write_realtime(self, ID: int, GN: str, Value, RealTime=None):
"""
### OPIO(写入实时值)
"""
if RealTime is None:
time_now = datetime.datetime.now()
RealTime = datetime.datetime.strftime(time_now, "'%Y-%m-%d %H:%M:%S'")
else:
RealTime = RealTime
WriteCode = "insert into Realtime values " + str((ID, GN, RealTime, 0, Value))
con = Connect(self.host, self.port, self.timeout, self.user, self.password)
resultSet = con.executeQuery(WriteCode)
resultSet.close()
con.close()
# 新增点、插入值
def api_insert(self, TableName: str, ColNames: list, Rows: list):
"""
### OPIO(新增点、插入值(必须有点))
### 参数
- TableName: 'POINT' or 'REALTIME'
- ColNames: 列集合 = ['GN', 'RT', 'ED']
- Rows: 对应ColNames的多个点的嵌套列表
"""
con = Connect(self.host, self.port, self.timeout, self.user, self.password)
resultSet = con.insert(TableName, ColNames, Rows) # 执行插入
try:
if not resultSet.isHaveWall():
while resultSet.Next():
colNum = resultSet.columnsNum
for i in range(colNum):
colN = resultSet.columnLabel(i)
colV = resultSet.getValue(i)
print(colN, ":", colV)
else:
print("穿透隔离器成功")
except Exception as e:
print("error:", e)
finally:
resultSet.close()
con.close()
# 删除点
def api_delete(self, TableName: str, ColNames: list, Keys: list):
"""
### OPIO(删除点)
### 参数
- TableName: "Point",
- ColNames: ["GN"]
- Keys: ['DEFAULT.NODE1.X7']
"""
con = Connect(self.host, self.port, self.timeout, self.user, self.password)
resultSet = con.delete(TableName, ColNames, Keys)
try:
while resultSet.Next():
colNum = resultSet.columnsNum
for i in range(colNum):
colN = resultSet.columnLabel(i)
colV = resultSet.getValue(i)
print(colN, ":", colV)
except Exception as e:
print("error:", e)
finally:
resultSet.close()
con.close()
# 更新
def api_update(self, TableName: str, ColNames: list, Rows: list):
"""
### OPIO(更新)
### 参数
- TableName: "POINT" or "REALTIME"
- ColNames: ["GN", "ED", "AV"]
- Rows: 多个点对应ColNames的嵌套列表, [["DEFAULT.NODE1.X7", "test", 3]]
"""
con = Connect(self.host, self.port, self.timeout, self.user, self.password)
resultSet = con.update(TableName, ColNames, Rows)
try:
if not resultSet.isHaveWall():
while resultSet.Next():
colNum = resultSet.columnsNum
for i in range(colNum):
colN = resultSet.columnLabel(i)
colV = resultSet.getValue(i)
# print(colN, ':', colV)
else:
print("穿透隔离器成功")
except Exception as e:
print("error:", e)
finally:
resultSet.close()
con.close()
def __onCallback(self, owner, response):
"回调函数, 订阅信息推送会调用该函数"
if response != None:
io = IO()
result = io.get_table(response)
error = io.get_errno(response)
if error == 0:
resultSet = ResultSet(None, None, response)
colCount = resultSet.columnCount()
rowCount = resultSet.rowCount()
for j in range(rowCount):
io.set_rowid(result, j)
for i in range(colCount):
colLab = resultSet.columnLabel(i)
colValue = resultSet.getValue(i)
print("cloLab:", colLab, "colValue:", colValue)
resultSet.close()
def __asyncSubscribe(self, ahObject, GNs: list):
"动态订阅"
time.sleep(60)
ahObject.add(GNs)
# ahObject.remove(GNs)
# 异步订阅
def api_async_(self, tableName, IDs, GNs):
"""
### OPIO(异步订阅)
### 参数
- TableName = 'Realtime' # 订阅实时表
- IDs = [84633408]
- GNs = ['DEFAULT.NODE1.X7']
"""
import _thread
global cb
global cbRealtime
cb = CFUNCTYPE(None, c_void_p, c_void_p)
cbRealtime = cb(self.__onCallback)
con = Connect(self.host, self.port, self.timeout, self.user, self.password)
asyncHandle = con.openAsync(tableName, cbRealtime, IDs) # 开始订阅
_thread.start_new_thread(self.__asyncSubscribe, (asyncHandle, GNs))
con.close() # 关闭连接