OPAPI_36.py•35.4 kB
from ctypes import *
import datetime
import time
import numpy
import platform
from pathlib import Path
# 获取当前脚本的路径
script_path = Path(__file__).parent.parent
# 使用相对于脚本的路径计算动态库的路径
if platform.system() == 'Windows':
    library_path = str(Path(script_path) / 'lib' / 'opapi4.dll')
    api = WinDLL(library_path)
elif platform.system() == 'Darwin':
    library_path = Path(script_path) / 'lib' / 'libopapi4.dylib'
    api = CDLL(library_path)
else:
    if platform.machine() == 'aarch64':
        library_path = Path(script_path) / 'lib' / 'libopapi4_aarch64.so'
    else:
        library_path = Path(script_path) / 'lib' / 'libopapi4_x86_64.so'
    api = CDLL(library_path)
class IO:
    "OPAPI外部接口"
    def init(
        self, option, host, port, timeout, user, password, buffer_path, buffer_size
    ):
        api.op2_init.restype = c_void_p
        api.op2_init.argtypes = [
            c_int,
            c_char_p,
            c_int,
            c_int,
            c_char_p,
            c_char_p,
            c_char_p,
            c_int,
        ]
        if isinstance(host, str):
            host = str.encode(host)
        if isinstance(user, str):
            user = str.encode(user)
        if isinstance(password, str):
            password = str.encode(password)
        if isinstance(buffer_path, str):
            buffer_path = str.encode(buffer_path)
        res = api.op2_init(
            c_int(option),
            c_char_p(host),
            c_int(port),
            c_int(timeout),
            c_char_p(user),
            c_char_p(password),
            c_char_p(buffer_path),
            c_int(buffer_size),
        )
        return res
    def close(self, op):
        api.op2_close.argtypes = [c_void_p]
        api.op2_close(c_void_p(op))
        return
    def status(self, op):
        api.op2_status.restype = c_int
        api.op2_status.argtypes = [c_void_p]
        res = api.op2_status(c_void_p(op))
        return res
    def get_system_time(self, op, out):
        api.op2_init.restype = c_int
        api.op2_init.argtypes = [c_void_p, POINTER(c_int)]
        res = api.op2_get_system_time(c_void_p(op), byref(out))
        return res
    def try_connect(self, op):
        api.op2_try_connect.restype = c_int
        api.op2_try_connect.argtypes = [c_void_p]
        res = api.op2_try_connect(c_void_p(op))
        return res
    def new_table(self, name):
        if isinstance(name, str):
            name = str.encode(name)
        api.op2_new_table.restype = c_void_p
        api.op2_new_table.argtypes = [c_char_p]
        res = api.op2_new_table(c_char_p(name))
        return res
    def new_row(self, table):
        api.op2_new_row.restype = c_void_p
        api.op2_new_row.argtypes = [c_void_p]
        res = api.op2_new_row(c_void_p(table))
        return res
    def free_table(self, table):
        api.op2_free_table.argtypes = [c_void_p]
        api.op2_free_table(c_void_p(table))
        return
    def add_column(self, table, name, type_, length, mask, defval, expr):
        if isinstance(name, str):
            name = str.encode(name)
        if isinstance(defval, str):
            defval = str.encode(defval)
        if isinstance(expr, str):
            expr = str.encode(expr)
        api.op2_add_column.restype = c_int
        api.op2_add_column.argtypes = [
            c_void_p,
            c_char_p,
            c_int,
            c_int,
            c_int,
            c_char_p,
            c_char_p,
        ]
        res = api.op2_add_column(
            c_void_p(table),
            c_char_p(name),
            c_int(type_),
            c_int(length),
            c_int(mask),
            c_char_p(defval),
            c_char_p(expr),
        )
        return res
    def column_count(self, table):
        api.op2_column_count.restype = c_int
        api.op2_column_count.argtypes = [c_void_p]
        res = api.op2_column_count(c_void_p(table))
        return res
    def column_type(self, table, col):
        api.op2_column_type.restype = c_int
        api.op2_column_type.argtypes = [c_void_p, c_int]
        res = api.op2_column_type(c_void_p(table), c_int(col))
        return res
    def value_type(self, table, col):
        api.op2_value_type.restype = c_int
        api.op2_value_type.argtypes = [c_void_p, c_int]
        res = api.op2_value_type(c_void_p(table), c_int(col))
        return res
    def column_name(self, table, col):
        api.op2_column_name.restype = c_char_p
        api.op2_column_name.argtypes = [c_void_p, c_int]
        res = api.op2_column_name(c_void_p(table), c_int(col))
        if isinstance(res, bytes):
            res = bytes.decode(res)
        return res
    def column_index(self, table, name):
        if isinstance(name, str):
            name = str.encode(name)
        api.op2_column_index.restype = c_int
        api.op2_column_index.argtypes = [c_void_p, c_char_p]
        res = api.op2_column_index(c_void_p(table), c_char_p(name))
        return res
    def append_row(self, table):
        api.op2_append_row.restype = c_int
        api.op2_append_row.argtypes = [c_void_p]
        res = api.op2_append_row(c_void_p(table))
        return res
    def new_request(self):
        api.op2_new_request.restype = c_void_p
        res = api.op2_new_request()
        return res
    def free_request(self, r):
        api.op2_free_request.argtypes = [c_void_p]
        res = api.op2_free_request(c_void_p(r))
        return
    def get_stream(self, openplant):
        api.op2_get_stream.restype = c_void_p
        api.op2_get_stream.argtypes = [c_void_p]
        res = api.op2_get_stream(c_void_p(openplant))
        return res
    def set_compress(self, opio, zip_):
        api.op2_set_compress.argtypes = [c_void_p, c_int]
        api.op2_set_compress(c_void_p(opio), c_int(zip_))
        return
    def write_request(self, opio, r):
        api.op2_write_request.restype = c_int
        api.op2_write_request.argtypes = [c_void_p, c_void_p]
        res = api.op2_write_request(c_void_p(opio), c_void_p(r))
        return res
    def write_content(self, opio, t):
        api.op2_write_content.restype = c_int
        api.op2_write_content.argtypes = [c_void_p, c_void_p]
        res = api.op2_write_content(c_void_p(opio), c_void_p(t))
        return res
    def flush_content(self, opio):
        api.op2_flush_content.restype = c_int
        api.op2_flush_content.argtypes = [c_void_p]
        res = api.op2_flush_content(c_void_p(opio))
        return res
    def get_response(self, opio, r):
        api.op2_get_response.restype = c_int
        api.op2_get_response.argtypes = [c_void_p, POINTER(c_void_p)]
        res = api.op2_get_response(c_void_p(opio), byref(r))
        return res
    def has_wall(self, r):
        api.op2_has_wall.restype = c_int
        api.op2_has_wall.argtypes = [c_void_p]
        res = api.op2_has_wall(r)
        return res
    def next_content(self, opio, result, clear, eof):
        api.op2_next_content.restype = c_int
        api.op2_next_content.argtypes = [c_void_p, c_void_p, c_int, POINTER(c_int)]
        res = api.op2_next_content(
            c_void_p(opio), c_void_p(result), c_int(clear), byref(c_int(eof))
        )
        return res
    def get_error(self, r):
        api.op2_get_error.restype = c_char_p
        api.op2_get_error.argtypes = [c_void_p]
        res = api.op2_get_error(r)
        return res
    def get_errno(self, r):
        api.op2_get_errno.restype = c_int
        api.op2_get_errno.argtypes = [c_void_p]
        res = api.op2_get_errno(r)
        return res
    def free_response(self, r):
        api.op2_free_response.argtypes = [c_void_p]
        api.op2_free_response(r)
        return
    def set_rowid(self, table, rowid):
        api.op2_set_rowid.restype = c_int
        api.op2_set_rowid.argtypes = [c_void_p, c_int]
        res = api.op2_set_rowid(c_void_p(table), c_int(rowid))
        return res
    def append(self, table, row):
        api.op2_append.restype = c_int
        api.op2_append.argtypes = [c_void_p, c_void_p]
        res = api.op2_append(c_void_p(table), c_void_p(row))
        return res
    def row_count(self, table):
        api.op2_row_count.restype = c_int
        api.op2_row_count.argtypes = [c_void_p]
        res = api.op2_row_count(c_void_p(table))
        return res
    def column_int(self, row, col):
        api.op2_column_int.restype = c_int64
        api.op2_column_int.argtypes = [c_void_p, c_int]
        res = api.op2_column_int(c_void_p(row), c_int(col))
        return res
    def column_double(self, row, col):
        api.op2_column_double.restype = c_double
        api.op2_column_double.argtypes = [c_void_p, c_int]
        res = api.op2_column_double(c_void_p(row), c_int(col))
        return res
    def column_string(self, row, col):
        api.op2_column_string.restype = c_char_p
        api.op2_column_string.argtypes = [c_void_p, c_int]
        res = api.op2_column_string(c_void_p(row), c_int(col))
        return res
    def column_bytes(self, row, col):
        api.op2_column_bytes.restype = c_int
        api.op2_column_bytes.argtypes = [c_void_p, c_int]
        res = api.op2_column_bytes(c_void_p(row), c_int(col))
        return res
    def column_binary(self, row, col):
        api.op2_column_binary.restype = c_void_p
        api.op2_column_binary.argtypes = [c_void_p, c_int]
        res = api.op2_column_binary(c_void_p(row), c_int(col))
        bytesLen = self.column_bytes(row, col)
        values = string_at(res, bytesLen)
        return values
    def column_type(self, row, col):
        api.op2_value_type.restype = c_int
        api.op2_value_type.argtypes = [c_void_p, c_int]
        res = api.op2_value_type(c_void_p(row), c_int(col))
        return res
    def bind_bool(self, row, col, value):
        api.op2_bind_bool.argtypes = [c_void_p, c_int, c_int]
        api.op2_bind_bool(c_void_p(row), c_int(col), c_int(value))
        return
    def bind_int8(self, row, col, value, mask=-1):
        api.op2_bind_int8.argtypes = [c_void_p, c_int, c_int, c_int64]
        api.op2_bind_int8(c_void_p(row), c_int(col), c_int(value), c_int64(mask))
        return
    def bind_int16(self, row, col, value, mask=-1):
        api.op2_bind_int16.argtypes = [c_void_p, c_int, c_int, c_int64]
        api.op2_bind_int16(c_void_p(row), c_int(col), c_int(value), c_int64(mask))
        return
    def bind_int32(self, row, col, value, mask=-1):
        api.op2_bind_int32.argtypes = [c_void_p, c_int, c_int, c_int64]
        api.op2_bind_int32(c_void_p(row), c_int(col), c_int(value), c_int64(mask))
        return
    def bind_int(self, row, col, value, mask=-1):
        api.op2_bind_int.argtypes = [c_void_p, c_int, c_int64, c_int64]
        api.op2_bind_int(c_void_p(row), c_int(col), c_int64(value), c_int64(mask))
        return
    def bind_float(self, row, col, value):
        api.op2_bind_float.argtypes = [c_void_p, c_int, c_float]
        api.op2_bind_float(c_void_p(row), c_int(col), c_float(value))
        return
    def bind_double(self, row, col, value):
        api.op2_bind_double.argtypes = [c_void_p, c_int, c_double]
        api.op2_bind_double(c_void_p(row), c_int(col), c_double(value))
        return
    def bind_string(self, row, col, value):
        if isinstance(value, str):
            value = str.encode(value)
        api.op2_bind_string.argtypes = [c_void_p, c_int, c_char_p]
        api.op2_bind_string(c_void_p(row), c_int(col), c_char_p(value))
        return
    def bind_binary(self, row, col, value, len_):
        api.op2_bind_binary.argtypes = [c_void_p, c_int, c_void_p, c_int]
        api.op2_bind_binary(c_void_p(row), c_int(col), c_void_p(value), c_int(len_))
        return
    def set_table(self, r, t):
        api.op2_set_table.argtypes = [c_void_p, c_void_p]
        api.op2_set_table(c_void_p(r), c_void_p(t))
        return
    def get_table(self, r):
        api.op2_get_table.restype = c_void_p
        api.op2_get_table.argtypes = [c_void_p]
        res = api.op2_get_table(r)
        return res
    def set_option(self, r, key, value):
        if isinstance(key, str):
            key = str.encode(key)
        if isinstance(value, str):
            value = str.encode(value)
        api.op2_set_option.argtypes = [c_void_p, c_char_p, c_char_p]
        api.op2_set_option(c_void_p(r), c_char_p(key), c_char_p(value))
        return
    def get_option(self, r, key, buffer_, len_):
        if isinstance(key, str):
            key = str.encode(key)
        if isinstance(buffer_, str):
            buffer_ = str.encode(buffer_)
        api.op2_get_option.restype = c_char_p
        api.op2_get_option.argtypes = [c_void_p, c_char_p, c_char_p, c_int]
        res = api.op2_get_option(
            c_void_p(r), c_char_p(key), c_char_p(buffer_), c_int(len_)
        )
        return res
    def set_indices(self, r, name, count, keys):
        if isinstance(name, str):
            name = str.encode(name)
        myArr_int64 = c_int64 * count
        keys_ = myArr_int64()
        for i, v in enumerate(keys):
            keys_[i] = v
        api.op2_set_indices.restype = None
        api.op2_set_indices.argtypes = [
            c_void_p,
            c_char_p,
            c_int,
            POINTER(c_int64 * count),
        ]
        api.op2_set_indices(c_void_p(r), c_char_p(name), c_int(count), pointer(keys_))
        return
    def set_indices_string(self, r, name, count, keys):
        if isinstance(name, str):
            name = str.encode(name)
        myArr_char_p = c_char_p * count
        keys_ = myArr_char_p()
        for i, v in enumerate(keys):
            keys_[i] = str.encode(v)
        api.op2_set_indices_string.restype = None
        api.op2_set_indices_string.argtypes = [
            c_void_p,
            c_char_p,
            c_int,
            POINTER(c_char_p * count),
        ]
        api.op2_set_indices_string(
            c_void_p(r), c_char_p(name), c_int(count), pointer(keys_)
        )
        return
    def add_filter(self, req, l, op, r, rel):
        if isinstance(l, str):
            l = str.encode(l)
        if isinstance(r, str):
            r = str.encode(r)
        api.op2_add_filter.restype = None
        api.op2_add_filter.argtypes = [c_void_p, c_char_p, c_int, c_char_p, c_int]
        api.op2_add_filter(
            c_void_p(req), c_char_p(l), c_int(op), c_char_p(r), c_int(rel)
        )
        return
    def open_async(self, op, r, cb, owner, error):
        # cbType=CFUNCTYPE(None,c_void_p,c_void_p)
        api.op2_open_async.restype = c_void_p
        # api.op2_open_async.argtypes=[c_void_p,c_void_p,cbType,c_void_p,POINTER(int)]
        res = api.op2_open_async(
            c_void_p(op), c_void_p(r), cb, c_void_p(owner), byref(error)
        )
        return res
    def close_async(self, ah):
        api.op2_close_async.argtypes = [c_void_p]
        api.op2_close_async(c_void_p(ah))
        return
    def async_subscribe(self, ah, count, ids, onoff):
        myArr_int64 = c_int64 * count
        keys_ = myArr_int64()
        for i, v in enumerate(ids):
            keys_[i] = v
        api.op2_async_subscribe.restype = c_int
        api.op2_async_subscribe.argtypes = [
            c_void_p,
            c_int,
            POINTER(c_int64 * count),
            c_int,
        ]
        res = api.op2_async_subscribe(
            c_void_p(ah), c_int(count), pointer(keys_), c_int(onoff)
        )
        return res
    def async_subscribe_tags(self, ah, count, tags, onoff):
        myArr_char_p = c_char_p * count
        keys_ = myArr_char_p()
        for i, v in enumerate(tags):
            keys_[i] = str.encode(v)
        api.op2_async_subscribe_tags.restype = c_int
        api.op2_async_subscribe_tags.argtypes = [
            c_void_p,
            c_int,
            POINTER(c_char_p * count),
            c_int,
        ]
        res = api.op2_async_subscribe_tags(
            c_void_p(ah), c_int(count), pointer(keys_), c_int(onoff)
        )
        return res
    # ***opapi2 start***
    def get_history_byname(
        self, op, gh, value_types, begin_tm, end_tm, interval, result, errors
    ):
        count = len(value_types)
        myArr_int = c_int * count
        typs_ = myArr_int()
        for i, v in enumerate(value_types):
            typs_[i] = v
        api.op2_get_history_byname.restype = c_int
        api.op2_get_history_byname.argtypes = [
            c_void_p,
            c_void_p,
            POINTER(c_int * count),
            c_int,
            c_int,
            c_int,
            POINTER(c_void_p),
            POINTER(c_int * count),
        ]
        res = api.op2_get_history_byname(
            c_void_p(op),
            c_void_p(gh),
            pointer(typs_),
            c_int(begin_tm),
            c_int(end_tm),
            c_int(interval),
            byref(result),
            byref(errors),
        )
        return res
    # ***opapi2 end***
class Enum(tuple):
    __getattr__ = tuple.index
Type = Enum(
    [
        "vtNull",
        "vtBool",
        "vtInt8",
        "vtInt16",
        "vtInt32",
        "vtInt64",
        "vtFloat",
        "vtDouble",
        "vtDateTime",
        "vtString",
        "vtBinary",
        "vtObject",
        "vtArray",
        "vtMap",
    ]
)
class eType:
    "常用数据类型"
    Null = 0
    Bool = 1
    Int = 5
    Long = 5
    Float = 7
    Datetime = 8
    String = 9
    Binary = 10
    Object = 11
class Connect(IO):
    "连接类"
    actionInsert = 1
    actionDelete = 2
    actionUpdate = 3
    actionSelect = 4
    # option为0,表示自动判断隔离;为1,表示强制隔离
    def __init__(self, ip, port, timeout, user, password, option=0):
        self.op = self.init(option, ip, port, timeout, user, password, None, 0)
    def __del__(self):
        self.close()
    def __useKeyFindType(self, x):
        return {
            "int": eType.Int,
            "text": eType.String,
            "datetime": eType.Datetime,
            "float": eType.Float,
            "double": eType.Float,
            "tinyint": eType.Int,
            "smallint": eType.Int,
            "bigint": eType.Int,
            "blob": eType.Binary,
        }.get(x, eType.String)
    def close(self):
        "关闭连接"
        if self.op != None:
            IO.close(self, self.op)
            self.op = None
    def isAlive(self):
        "判断当前连接是否断开"
        return (self.op != None) and (self.status(self.op) == 0)
    def reconnect(self):
        "连接重连"
        return (self.op != None) and (self.try_connect(self.op) == 0)
    def __execute(self, op, request):
        response = c_void_p(None)
        table_get = self.get_table(request)
        opio = self.get_stream(op)
        self.set_compress(opio, 1)
        rv = self.write_request(opio, request)
        if rv == 0:
            if table_get != None:
                rv = self.write_content(opio, table_get)
            rv = self.flush_content(opio)
        if rv != 0:
            raise Exception("write request err")
        rv = self.get_response(opio, response)
        if rv != 0:
            raise Exception("get response err")
        if self.get_errno(response) != 0:
            raise Exception(self.get_error(response))
        resultSet = ResultSet(opio, request, response)
        return resultSet
    def executeQuery(self, sql):
        "SQL请求"
        if self.op == None:
            raise Exception("op_init error")
        request = self.new_request()
        self.set_option(request, "Reqid", "1")
        self.set_option(request, "Action", "ExecSQL")
        self.set_option(request, "SQL", sql)
        return self.__execute(self.op, request)
    def __modify(self, action, tableName, colNames, rows_):
        rows = None
        if self.op == None:
            raise Exception("op_init error")
        if isinstance(rows_, list):
            rows = numpy.array(rows_)
        elif isinstance(rows_, numpy.ndarray):
            rows = rows_
        else:
            raise Exception("rows type error")
        tableName_getType = ""
        if tableName.find(".") > 0:
            tableName_getType = tableName.partition(".")[2]
        else:
            tableName_getType = tableName
        rowCount = rows.shape[0]
        colCount = rows.shape[1]
        table = self.new_table(tableName)
        try:
            for i in range(colCount):
                cName = colNames[i]
                self.add_column(table, cName, eType.Object, 0, 0, None, None)
        except Exception as e:
            raise Exception(e)
        try:
            for i in range(rowCount):
                row = self.new_row(table)
                for j in range(colCount):
                    v = rows[i][j]
                    if v == None:
                        self.bind_binary(row, j, None, 0)
                    elif isinstance(v, int) or isinstance(v, int):
                        self.bind_int(row, j, v)
                    elif isinstance(v, float):
                        self.bind_double(row, j, v)
                    elif isinstance(v, datetime.datetime):
                        t = v.timetuple()
                        timeStamp = int(time.mktime(t))
                        tStr = (
                            str(timeStamp)
                            + str("%.3f" % (float(v.microsecond) / 1000000))[1:]
                        )
                        self.bind_string(row, j, tStr)
                    else:
                        self.bind_string(row, j, v)
                self.append(table, row)
        except Exception as e:
            raise Exception(e)
        request = self.new_request()
        self.set_table(request, table)
        self.set_option(request, "Reqid", "1")
        if action == Connect.actionInsert:
            self.set_option(request, "Action", "Insert")
        elif action == Connect.actionUpdate:
            self.set_option(request, "Action", "Update")
        return self.__execute(self.op, request)
    def __findOrDelete(self, action, tableName, colNames, keys, options):
        tableName_getType = ""
        if tableName.find(".") > 0:
            tableName_getType = tableName.partition(".")[2]
        else:
            tableName_getType = tableName
        table = self.new_table(tableName)
        for v in colNames:
            self.add_column(table, v, eType.Object, 0, 0, None, None)
        if action == Connect.actionDelete:
            try:
                for i, v in enumerate(keys):
                    row = self.new_row(table)
                    if isinstance(v, int) or isinstance(v, int):
                        self.bind_int(row, 0, v)
                    elif isinstance(v, float):
                        self.bind_double(row, 0, v)
                    else:
                        self.bind_string(row, 0, v)
                    self.append(table, row)
            except Exception:
                raise Exception("keys type must consistent")
        request = self.new_request()
        self.set_table(request, table)
        self.set_option(request, "Reqid", "1")
        if action == Connect.actionSelect:
            self.set_option(request, "Action", "Select")
            if isinstance(options, dict):
                for key, value in list(options.items()):
                    if isinstance(value, datetime.datetime):
                        t = value.timetuple()
                        timeStamp = int(time.mktime(t))
                        tStr = (
                            str(timeStamp)
                            + str("%.3f" % (float(value.microsecond) / 1000000))[1:]
                        )
                    else:
                        tStr = str(value)
                    self.set_option(request, key, tStr)
            elif options != None:
                raise Exception("options type must dict")
            if isinstance(keys[0], int):
                self.set_indices(request, None, len(keys), keys)
            elif isinstance(keys[0], str):
                self.set_indices_string(request, None, len(keys), keys)
        elif action == Connect.actionDelete:
            self.set_option(request, "Action", "Delete")
        return self.__execute(self.op, request)
    def insert(self, tableName, colNames, rows):
        "OPIO插入"
        if (not isinstance(tableName, str)) or (
            not ((isinstance(colNames, list) or isinstance(colNames, tuple)))
            or (
                not (
                    (
                        isinstance(rows, list)
                        or isinstance(rows, tuple)
                        or isinstance(rows, numpy.ndarray)
                    )
                )
            )
        ):
            raise Exception("insert parameter error")
        return self.__modify(Connect.actionInsert, tableName, colNames, rows)
    def delete(self, tableName, colNames, keys):
        "OPIO删除"
        if (not isinstance(tableName, str)) or (
            not ((isinstance(colNames, list) or isinstance(colNames, tuple)))
            or (not ((isinstance(keys, list) or isinstance(keys, tuple))))
        ):
            raise Exception("delete parameter error")
        return self.__findOrDelete(
            Connect.actionDelete, tableName, colNames, keys, None
        )
    def update(self, tableName, colNames, rows):
        "OPIO更新"
        if (not isinstance(tableName, str)) or (
            not ((isinstance(colNames, list) or isinstance(colNames, tuple)))
            or (
                not (
                    (
                        isinstance(rows, list)
                        or isinstance(rows, tuple)
                        or isinstance(rows, numpy.ndarray)
                    )
                )
            )
        ):
            raise Exception("update parameter error")
        return self.__modify(Connect.actionUpdate, tableName, colNames, rows)
    def select(self, tableName, colNames, keys, options):
        "OPIO查询"
        if (
            (not isinstance(tableName, str))
            or (
                not ((isinstance(colNames, list) or isinstance(colNames, tuple)))
                or (not ((isinstance(keys, list) or isinstance(keys, tuple))))
            )
            or ((options) and (not isinstance(options, dict)))
        ):
            raise Exception("select parameter error")
        return self.__findOrDelete(
            Connect.actionSelect, tableName, colNames, keys, options
        )
    def openAsync(self, tableName, cb, keys):
        "异步订阅"
        async_request = self.new_request()
        table = self.new_table(tableName)
        self.add_column(table, "*", eType.Null, 0, 0, None, None)
        self.set_table(async_request, table)
        keyLen = len(keys)
        if keyLen <= 0:
            raise Exception("keys num is zero")
        if (not isinstance(keys, list)) and (not isinstance(keys, tuple)):
            raise Exception("keys type must be list or tuple")
        if isinstance(keys[0], str):
            self.set_indices_string(async_request, "GN", keyLen, keys)
        elif isinstance(keys[0], int):
            self.set_indices(async_request, "ID", keyLen, keys)
        else:
            raise Exception("key must be ID or GN")
        self.set_option(async_request, "Snapshot", "1")
        error = c_int(-1)
        ah = self.open_async(self.op, async_request, cb, None, error)
        if ah == None:
            if async_request != None:
                self.free_request(async_request)
            raise Exception("open_async error:" + str(error.value))
        return Async(ah)
class ResultSet(IO):
    "结果类"
    def __init__(self, opio, request, response):
        self.hasWall = True if self.has_wall(response) == 1 else False
        self.tableResult = self.get_table(response)
        self.request = request
        self.response = response
        self.opio = opio
        self.pos = 0
        self.rowsNum = 0
        self.columnsNum = 0
    def __del__(self):
        self.close()
    def __nextContent(self):
        eof = 0
        if eof == 0:
            rv = self.next_content(self.opio, self.tableResult, 1, eof)
            if rv != 0:
                self.opio = None
            self.pos = 0
            self.rowsNum = self.row_count(self.tableResult)
            self.columnsNum = self.column_count(self.tableResult)
        return eof == 0 and self.pos < self.rowsNum
    def Next(self):
        "下移游标一行"
        if self.hasWall:
            return False
        self.pos += 1
        if self.pos >= self.rowsNum:
            if not self.__nextContent():
                return False
        self.set_rowid(self.tableResult, self.pos)
        return True
    def isHaveWall(self):
        return self.hasWall
    def getString(self, keyOrColumn):
        "根据列或字段获取字符串"
        val = None
        col = 0
        if isinstance(keyOrColumn, str):
            col = self.column_index(self.tableResult, keyOrColumn)
        elif isinstance(keyOrColumn, int):
            col = keyOrColumn
        if col != -1:
            val = self.column_string(self.tableResult, col)
        if isinstance(val, bytes):
            val = bytes.decode(val)
        return val
    def getInt(self, keyOrColumn):
        "根据列或字段获取整型"
        num = None
        col = 0
        if isinstance(keyOrColumn, str):
            col = self.column_index(self.tableResult, keyOrColumn)
        elif isinstance(keyOrColumn, int):
            col = keyOrColumn
        if col != -1:
            num = self.column_int(self.tableResult, col)
        return num
    def getFloat(self, keyOrColumn):
        "根据列或字段获取浮点型"
        num = None
        col = 0
        if isinstance(keyOrColumn, str):
            col = self.column_index(self.tableResult, keyOrColumn)
        elif isinstance(keyOrColumn, int):
            col = keyOrColumn
        if col != -1:
            num = self.column_double(self.tableResult, col)
        return num
    def getDateTime(self, keyOrColumn):
        "根据列或字段获取Datetime结构体"
        date = None
        col = 0
        if isinstance(keyOrColumn, str):
            col = self.column_index(self.tableResult, keyOrColumn)
        elif isinstance(keyOrColumn, int):
            col = keyOrColumn
        if col != -1:
            timestamp = self.column_double(self.tableResult, col)
            date = datetime.datetime.fromtimestamp(timestamp)
        return date
    def getValue(self, keyOrColumn):
        "根据列或字段获取值,返回值类型为实际类型"
        value = None
        col = 0
        if isinstance(keyOrColumn, str):
            col = self.column_index(self.tableResult, keyOrColumn)
        elif isinstance(keyOrColumn, int):
            col = keyOrColumn
        if col != -1:
            type_ = self.value_type(self.tableResult, col)
            if (
                type_ == Type.vtBool
                or type_ == Type.vtInt8
                or type_ == Type.vtInt16
                or type_ == Type.vtInt32
                or type_ == Type.vtInt64
            ):
                value = self.column_int(self.tableResult, col)
            elif type_ == Type.vtFloat or type_ == Type.vtDouble:
                value = self.column_double(self.tableResult, col)
            elif type_ == Type.vtDateTime:
                v = self.column_double(self.tableResult, col)
                value = datetime.datetime.fromtimestamp(v)
            elif type_ == Type.vtBinary:
                value = self.column_binary(self.tableResult, col)
            else:
                value = self.column_string(self.tableResult, col)
                if isinstance(value, bytes):
                    value = bytes.decode(value)
        return value
    def columnLabel(self, column):
        "根据列获取对应字段名"
        name = None
        if isinstance(column, int):
            name = self.column_name(self.tableResult, column)
        return name
    def columnCount(self):
        "获取列个数"
        return self.column_count(self.tableResult)
    def rowCount(self):
        "获取行个数"
        return self.row_count(self.tableResult)
    def columnType(self, column):
        "获取列类型"
        colType = self.column_type(self.tableResult, column)
        if colType >= 1 and colType < 5:
            return eType.Int
        elif colType == 5:
            return eType.Long
        elif colType >= 6 and colType <= 7:
            return eType.Float
        else:
            return colType
    def valueType(self, column):
        "获取值类型"
        valueType = self.value_type(self.tableResult, column)
        if valueType >= 1 and valueType < 5:
            return eType.Int
        elif valueType == 5:
            return eType.Long
        elif valueType >= 6 and valueType <= 7:
            return eType.Float
        else:
            return valueType
    def close(self):
        "释放内存"
        if self.request != None:
            self.free_request(self.request)
            self.request = None
        if self.response != None:
            self.free_response(self.response)
            self.response = None
class Async(IO):
    "异步订阅类"
    def __init__(self, ah):
        self.ah = ah
    def close(self):
        "关闭订阅"
        if self.ah != None:
            self.close_async(self.ah)
            self.ah = None
    def __subscribe(self, keys, onoff):
        keyLen = len(keys)
        if keyLen <= 0:
            raise Exception("keys num is zero")
        if (not isinstance(keys, list)) and (not isinstance(keys, tuple)):
            raise Exception("keys type must be list or tuple")
        if isinstance(keys[0], str):
            self.async_subscribe_tags(self.ah, keyLen, keys, onoff)
        elif isinstance(keys[0], int):
            self.async_subscribe(self.ah, keyLen, keys, onoff)
        else:
            raise Exception("key must be ID or GN")
    def add(self, keys):
        "动态添加"
        self.__subscribe(keys, 1)
    def remove(self, keys):
        "动态删除"
        self.__subscribe(keys, 0)