Skip to main content
Glama

Data Analysis MCP Server

by boyzhang666
OPAPI_36.py35.4 kB
from ctypes import * import datetime import time import numpy import platform from pathlib import Path # 获取当前脚本的路径 script_path = Path(__file__).parent.parent # 使用相对于脚本的路径计算动态库的路径 if platform.system() == 'Windows': library_path = str(Path(script_path) / 'lib' / 'opapi4.dll') api = WinDLL(library_path) elif platform.system() == 'Darwin': library_path = Path(script_path) / 'lib' / 'libopapi4.dylib' api = CDLL(library_path) else: if platform.machine() == 'aarch64': library_path = Path(script_path) / 'lib' / 'libopapi4_aarch64.so' else: library_path = Path(script_path) / 'lib' / 'libopapi4_x86_64.so' api = CDLL(library_path) class IO: "OPAPI外部接口" def init( self, option, host, port, timeout, user, password, buffer_path, buffer_size ): api.op2_init.restype = c_void_p api.op2_init.argtypes = [ c_int, c_char_p, c_int, c_int, c_char_p, c_char_p, c_char_p, c_int, ] if isinstance(host, str): host = str.encode(host) if isinstance(user, str): user = str.encode(user) if isinstance(password, str): password = str.encode(password) if isinstance(buffer_path, str): buffer_path = str.encode(buffer_path) res = api.op2_init( c_int(option), c_char_p(host), c_int(port), c_int(timeout), c_char_p(user), c_char_p(password), c_char_p(buffer_path), c_int(buffer_size), ) return res def close(self, op): api.op2_close.argtypes = [c_void_p] api.op2_close(c_void_p(op)) return def status(self, op): api.op2_status.restype = c_int api.op2_status.argtypes = [c_void_p] res = api.op2_status(c_void_p(op)) return res def get_system_time(self, op, out): api.op2_init.restype = c_int api.op2_init.argtypes = [c_void_p, POINTER(c_int)] res = api.op2_get_system_time(c_void_p(op), byref(out)) return res def try_connect(self, op): api.op2_try_connect.restype = c_int api.op2_try_connect.argtypes = [c_void_p] res = api.op2_try_connect(c_void_p(op)) return res def new_table(self, name): if isinstance(name, str): name = str.encode(name) api.op2_new_table.restype = c_void_p api.op2_new_table.argtypes = [c_char_p] res = api.op2_new_table(c_char_p(name)) return res def new_row(self, table): api.op2_new_row.restype = c_void_p api.op2_new_row.argtypes = [c_void_p] res = api.op2_new_row(c_void_p(table)) return res def free_table(self, table): api.op2_free_table.argtypes = [c_void_p] api.op2_free_table(c_void_p(table)) return def add_column(self, table, name, type_, length, mask, defval, expr): if isinstance(name, str): name = str.encode(name) if isinstance(defval, str): defval = str.encode(defval) if isinstance(expr, str): expr = str.encode(expr) api.op2_add_column.restype = c_int api.op2_add_column.argtypes = [ c_void_p, c_char_p, c_int, c_int, c_int, c_char_p, c_char_p, ] res = api.op2_add_column( c_void_p(table), c_char_p(name), c_int(type_), c_int(length), c_int(mask), c_char_p(defval), c_char_p(expr), ) return res def column_count(self, table): api.op2_column_count.restype = c_int api.op2_column_count.argtypes = [c_void_p] res = api.op2_column_count(c_void_p(table)) return res def column_type(self, table, col): api.op2_column_type.restype = c_int api.op2_column_type.argtypes = [c_void_p, c_int] res = api.op2_column_type(c_void_p(table), c_int(col)) return res def value_type(self, table, col): api.op2_value_type.restype = c_int api.op2_value_type.argtypes = [c_void_p, c_int] res = api.op2_value_type(c_void_p(table), c_int(col)) return res def column_name(self, table, col): api.op2_column_name.restype = c_char_p api.op2_column_name.argtypes = [c_void_p, c_int] res = api.op2_column_name(c_void_p(table), c_int(col)) if isinstance(res, bytes): res = bytes.decode(res) return res def column_index(self, table, name): if isinstance(name, str): name = str.encode(name) api.op2_column_index.restype = c_int api.op2_column_index.argtypes = [c_void_p, c_char_p] res = api.op2_column_index(c_void_p(table), c_char_p(name)) return res def append_row(self, table): api.op2_append_row.restype = c_int api.op2_append_row.argtypes = [c_void_p] res = api.op2_append_row(c_void_p(table)) return res def new_request(self): api.op2_new_request.restype = c_void_p res = api.op2_new_request() return res def free_request(self, r): api.op2_free_request.argtypes = [c_void_p] res = api.op2_free_request(c_void_p(r)) return def get_stream(self, openplant): api.op2_get_stream.restype = c_void_p api.op2_get_stream.argtypes = [c_void_p] res = api.op2_get_stream(c_void_p(openplant)) return res def set_compress(self, opio, zip_): api.op2_set_compress.argtypes = [c_void_p, c_int] api.op2_set_compress(c_void_p(opio), c_int(zip_)) return def write_request(self, opio, r): api.op2_write_request.restype = c_int api.op2_write_request.argtypes = [c_void_p, c_void_p] res = api.op2_write_request(c_void_p(opio), c_void_p(r)) return res def write_content(self, opio, t): api.op2_write_content.restype = c_int api.op2_write_content.argtypes = [c_void_p, c_void_p] res = api.op2_write_content(c_void_p(opio), c_void_p(t)) return res def flush_content(self, opio): api.op2_flush_content.restype = c_int api.op2_flush_content.argtypes = [c_void_p] res = api.op2_flush_content(c_void_p(opio)) return res def get_response(self, opio, r): api.op2_get_response.restype = c_int api.op2_get_response.argtypes = [c_void_p, POINTER(c_void_p)] res = api.op2_get_response(c_void_p(opio), byref(r)) return res def has_wall(self, r): api.op2_has_wall.restype = c_int api.op2_has_wall.argtypes = [c_void_p] res = api.op2_has_wall(r) return res def next_content(self, opio, result, clear, eof): api.op2_next_content.restype = c_int api.op2_next_content.argtypes = [c_void_p, c_void_p, c_int, POINTER(c_int)] res = api.op2_next_content( c_void_p(opio), c_void_p(result), c_int(clear), byref(c_int(eof)) ) return res def get_error(self, r): api.op2_get_error.restype = c_char_p api.op2_get_error.argtypes = [c_void_p] res = api.op2_get_error(r) return res def get_errno(self, r): api.op2_get_errno.restype = c_int api.op2_get_errno.argtypes = [c_void_p] res = api.op2_get_errno(r) return res def free_response(self, r): api.op2_free_response.argtypes = [c_void_p] api.op2_free_response(r) return def set_rowid(self, table, rowid): api.op2_set_rowid.restype = c_int api.op2_set_rowid.argtypes = [c_void_p, c_int] res = api.op2_set_rowid(c_void_p(table), c_int(rowid)) return res def append(self, table, row): api.op2_append.restype = c_int api.op2_append.argtypes = [c_void_p, c_void_p] res = api.op2_append(c_void_p(table), c_void_p(row)) return res def row_count(self, table): api.op2_row_count.restype = c_int api.op2_row_count.argtypes = [c_void_p] res = api.op2_row_count(c_void_p(table)) return res def column_int(self, row, col): api.op2_column_int.restype = c_int64 api.op2_column_int.argtypes = [c_void_p, c_int] res = api.op2_column_int(c_void_p(row), c_int(col)) return res def column_double(self, row, col): api.op2_column_double.restype = c_double api.op2_column_double.argtypes = [c_void_p, c_int] res = api.op2_column_double(c_void_p(row), c_int(col)) return res def column_string(self, row, col): api.op2_column_string.restype = c_char_p api.op2_column_string.argtypes = [c_void_p, c_int] res = api.op2_column_string(c_void_p(row), c_int(col)) return res def column_bytes(self, row, col): api.op2_column_bytes.restype = c_int api.op2_column_bytes.argtypes = [c_void_p, c_int] res = api.op2_column_bytes(c_void_p(row), c_int(col)) return res def column_binary(self, row, col): api.op2_column_binary.restype = c_void_p api.op2_column_binary.argtypes = [c_void_p, c_int] res = api.op2_column_binary(c_void_p(row), c_int(col)) bytesLen = self.column_bytes(row, col) values = string_at(res, bytesLen) return values def column_type(self, row, col): api.op2_value_type.restype = c_int api.op2_value_type.argtypes = [c_void_p, c_int] res = api.op2_value_type(c_void_p(row), c_int(col)) return res def bind_bool(self, row, col, value): api.op2_bind_bool.argtypes = [c_void_p, c_int, c_int] api.op2_bind_bool(c_void_p(row), c_int(col), c_int(value)) return def bind_int8(self, row, col, value, mask=-1): api.op2_bind_int8.argtypes = [c_void_p, c_int, c_int, c_int64] api.op2_bind_int8(c_void_p(row), c_int(col), c_int(value), c_int64(mask)) return def bind_int16(self, row, col, value, mask=-1): api.op2_bind_int16.argtypes = [c_void_p, c_int, c_int, c_int64] api.op2_bind_int16(c_void_p(row), c_int(col), c_int(value), c_int64(mask)) return def bind_int32(self, row, col, value, mask=-1): api.op2_bind_int32.argtypes = [c_void_p, c_int, c_int, c_int64] api.op2_bind_int32(c_void_p(row), c_int(col), c_int(value), c_int64(mask)) return def bind_int(self, row, col, value, mask=-1): api.op2_bind_int.argtypes = [c_void_p, c_int, c_int64, c_int64] api.op2_bind_int(c_void_p(row), c_int(col), c_int64(value), c_int64(mask)) return def bind_float(self, row, col, value): api.op2_bind_float.argtypes = [c_void_p, c_int, c_float] api.op2_bind_float(c_void_p(row), c_int(col), c_float(value)) return def bind_double(self, row, col, value): api.op2_bind_double.argtypes = [c_void_p, c_int, c_double] api.op2_bind_double(c_void_p(row), c_int(col), c_double(value)) return def bind_string(self, row, col, value): if isinstance(value, str): value = str.encode(value) api.op2_bind_string.argtypes = [c_void_p, c_int, c_char_p] api.op2_bind_string(c_void_p(row), c_int(col), c_char_p(value)) return def bind_binary(self, row, col, value, len_): api.op2_bind_binary.argtypes = [c_void_p, c_int, c_void_p, c_int] api.op2_bind_binary(c_void_p(row), c_int(col), c_void_p(value), c_int(len_)) return def set_table(self, r, t): api.op2_set_table.argtypes = [c_void_p, c_void_p] api.op2_set_table(c_void_p(r), c_void_p(t)) return def get_table(self, r): api.op2_get_table.restype = c_void_p api.op2_get_table.argtypes = [c_void_p] res = api.op2_get_table(r) return res def set_option(self, r, key, value): if isinstance(key, str): key = str.encode(key) if isinstance(value, str): value = str.encode(value) api.op2_set_option.argtypes = [c_void_p, c_char_p, c_char_p] api.op2_set_option(c_void_p(r), c_char_p(key), c_char_p(value)) return def get_option(self, r, key, buffer_, len_): if isinstance(key, str): key = str.encode(key) if isinstance(buffer_, str): buffer_ = str.encode(buffer_) api.op2_get_option.restype = c_char_p api.op2_get_option.argtypes = [c_void_p, c_char_p, c_char_p, c_int] res = api.op2_get_option( c_void_p(r), c_char_p(key), c_char_p(buffer_), c_int(len_) ) return res def set_indices(self, r, name, count, keys): if isinstance(name, str): name = str.encode(name) myArr_int64 = c_int64 * count keys_ = myArr_int64() for i, v in enumerate(keys): keys_[i] = v api.op2_set_indices.restype = None api.op2_set_indices.argtypes = [ c_void_p, c_char_p, c_int, POINTER(c_int64 * count), ] api.op2_set_indices(c_void_p(r), c_char_p(name), c_int(count), pointer(keys_)) return def set_indices_string(self, r, name, count, keys): if isinstance(name, str): name = str.encode(name) myArr_char_p = c_char_p * count keys_ = myArr_char_p() for i, v in enumerate(keys): keys_[i] = str.encode(v) api.op2_set_indices_string.restype = None api.op2_set_indices_string.argtypes = [ c_void_p, c_char_p, c_int, POINTER(c_char_p * count), ] api.op2_set_indices_string( c_void_p(r), c_char_p(name), c_int(count), pointer(keys_) ) return def add_filter(self, req, l, op, r, rel): if isinstance(l, str): l = str.encode(l) if isinstance(r, str): r = str.encode(r) api.op2_add_filter.restype = None api.op2_add_filter.argtypes = [c_void_p, c_char_p, c_int, c_char_p, c_int] api.op2_add_filter( c_void_p(req), c_char_p(l), c_int(op), c_char_p(r), c_int(rel) ) return def open_async(self, op, r, cb, owner, error): # cbType=CFUNCTYPE(None,c_void_p,c_void_p) api.op2_open_async.restype = c_void_p # api.op2_open_async.argtypes=[c_void_p,c_void_p,cbType,c_void_p,POINTER(int)] res = api.op2_open_async( c_void_p(op), c_void_p(r), cb, c_void_p(owner), byref(error) ) return res def close_async(self, ah): api.op2_close_async.argtypes = [c_void_p] api.op2_close_async(c_void_p(ah)) return def async_subscribe(self, ah, count, ids, onoff): myArr_int64 = c_int64 * count keys_ = myArr_int64() for i, v in enumerate(ids): keys_[i] = v api.op2_async_subscribe.restype = c_int api.op2_async_subscribe.argtypes = [ c_void_p, c_int, POINTER(c_int64 * count), c_int, ] res = api.op2_async_subscribe( c_void_p(ah), c_int(count), pointer(keys_), c_int(onoff) ) return res def async_subscribe_tags(self, ah, count, tags, onoff): myArr_char_p = c_char_p * count keys_ = myArr_char_p() for i, v in enumerate(tags): keys_[i] = str.encode(v) api.op2_async_subscribe_tags.restype = c_int api.op2_async_subscribe_tags.argtypes = [ c_void_p, c_int, POINTER(c_char_p * count), c_int, ] res = api.op2_async_subscribe_tags( c_void_p(ah), c_int(count), pointer(keys_), c_int(onoff) ) return res # ***opapi2 start*** def get_history_byname( self, op, gh, value_types, begin_tm, end_tm, interval, result, errors ): count = len(value_types) myArr_int = c_int * count typs_ = myArr_int() for i, v in enumerate(value_types): typs_[i] = v api.op2_get_history_byname.restype = c_int api.op2_get_history_byname.argtypes = [ c_void_p, c_void_p, POINTER(c_int * count), c_int, c_int, c_int, POINTER(c_void_p), POINTER(c_int * count), ] res = api.op2_get_history_byname( c_void_p(op), c_void_p(gh), pointer(typs_), c_int(begin_tm), c_int(end_tm), c_int(interval), byref(result), byref(errors), ) return res # ***opapi2 end*** class Enum(tuple): __getattr__ = tuple.index Type = Enum( [ "vtNull", "vtBool", "vtInt8", "vtInt16", "vtInt32", "vtInt64", "vtFloat", "vtDouble", "vtDateTime", "vtString", "vtBinary", "vtObject", "vtArray", "vtMap", ] ) class eType: "常用数据类型" Null = 0 Bool = 1 Int = 5 Long = 5 Float = 7 Datetime = 8 String = 9 Binary = 10 Object = 11 class Connect(IO): "连接类" actionInsert = 1 actionDelete = 2 actionUpdate = 3 actionSelect = 4 # option为0,表示自动判断隔离;为1,表示强制隔离 def __init__(self, ip, port, timeout, user, password, option=0): self.op = self.init(option, ip, port, timeout, user, password, None, 0) def __del__(self): self.close() def __useKeyFindType(self, x): return { "int": eType.Int, "text": eType.String, "datetime": eType.Datetime, "float": eType.Float, "double": eType.Float, "tinyint": eType.Int, "smallint": eType.Int, "bigint": eType.Int, "blob": eType.Binary, }.get(x, eType.String) def close(self): "关闭连接" if self.op != None: IO.close(self, self.op) self.op = None def isAlive(self): "判断当前连接是否断开" return (self.op != None) and (self.status(self.op) == 0) def reconnect(self): "连接重连" return (self.op != None) and (self.try_connect(self.op) == 0) def __execute(self, op, request): response = c_void_p(None) table_get = self.get_table(request) opio = self.get_stream(op) self.set_compress(opio, 1) rv = self.write_request(opio, request) if rv == 0: if table_get != None: rv = self.write_content(opio, table_get) rv = self.flush_content(opio) if rv != 0: raise Exception("write request err") rv = self.get_response(opio, response) if rv != 0: raise Exception("get response err") if self.get_errno(response) != 0: raise Exception(self.get_error(response)) resultSet = ResultSet(opio, request, response) return resultSet def executeQuery(self, sql): "SQL请求" if self.op == None: raise Exception("op_init error") request = self.new_request() self.set_option(request, "Reqid", "1") self.set_option(request, "Action", "ExecSQL") self.set_option(request, "SQL", sql) return self.__execute(self.op, request) def __modify(self, action, tableName, colNames, rows_): rows = None if self.op == None: raise Exception("op_init error") if isinstance(rows_, list): rows = numpy.array(rows_) elif isinstance(rows_, numpy.ndarray): rows = rows_ else: raise Exception("rows type error") tableName_getType = "" if tableName.find(".") > 0: tableName_getType = tableName.partition(".")[2] else: tableName_getType = tableName rowCount = rows.shape[0] colCount = rows.shape[1] table = self.new_table(tableName) try: for i in range(colCount): cName = colNames[i] self.add_column(table, cName, eType.Object, 0, 0, None, None) except Exception as e: raise Exception(e) try: for i in range(rowCount): row = self.new_row(table) for j in range(colCount): v = rows[i][j] if v == None: self.bind_binary(row, j, None, 0) elif isinstance(v, int) or isinstance(v, int): self.bind_int(row, j, v) elif isinstance(v, float): self.bind_double(row, j, v) elif isinstance(v, datetime.datetime): t = v.timetuple() timeStamp = int(time.mktime(t)) tStr = ( str(timeStamp) + str("%.3f" % (float(v.microsecond) / 1000000))[1:] ) self.bind_string(row, j, tStr) else: self.bind_string(row, j, v) self.append(table, row) except Exception as e: raise Exception(e) request = self.new_request() self.set_table(request, table) self.set_option(request, "Reqid", "1") if action == Connect.actionInsert: self.set_option(request, "Action", "Insert") elif action == Connect.actionUpdate: self.set_option(request, "Action", "Update") return self.__execute(self.op, request) def __findOrDelete(self, action, tableName, colNames, keys, options): tableName_getType = "" if tableName.find(".") > 0: tableName_getType = tableName.partition(".")[2] else: tableName_getType = tableName table = self.new_table(tableName) for v in colNames: self.add_column(table, v, eType.Object, 0, 0, None, None) if action == Connect.actionDelete: try: for i, v in enumerate(keys): row = self.new_row(table) if isinstance(v, int) or isinstance(v, int): self.bind_int(row, 0, v) elif isinstance(v, float): self.bind_double(row, 0, v) else: self.bind_string(row, 0, v) self.append(table, row) except Exception: raise Exception("keys type must consistent") request = self.new_request() self.set_table(request, table) self.set_option(request, "Reqid", "1") if action == Connect.actionSelect: self.set_option(request, "Action", "Select") if isinstance(options, dict): for key, value in list(options.items()): if isinstance(value, datetime.datetime): t = value.timetuple() timeStamp = int(time.mktime(t)) tStr = ( str(timeStamp) + str("%.3f" % (float(value.microsecond) / 1000000))[1:] ) else: tStr = str(value) self.set_option(request, key, tStr) elif options != None: raise Exception("options type must dict") if isinstance(keys[0], int): self.set_indices(request, None, len(keys), keys) elif isinstance(keys[0], str): self.set_indices_string(request, None, len(keys), keys) elif action == Connect.actionDelete: self.set_option(request, "Action", "Delete") return self.__execute(self.op, request) def insert(self, tableName, colNames, rows): "OPIO插入" if (not isinstance(tableName, str)) or ( not ((isinstance(colNames, list) or isinstance(colNames, tuple))) or ( not ( ( isinstance(rows, list) or isinstance(rows, tuple) or isinstance(rows, numpy.ndarray) ) ) ) ): raise Exception("insert parameter error") return self.__modify(Connect.actionInsert, tableName, colNames, rows) def delete(self, tableName, colNames, keys): "OPIO删除" if (not isinstance(tableName, str)) or ( not ((isinstance(colNames, list) or isinstance(colNames, tuple))) or (not ((isinstance(keys, list) or isinstance(keys, tuple)))) ): raise Exception("delete parameter error") return self.__findOrDelete( Connect.actionDelete, tableName, colNames, keys, None ) def update(self, tableName, colNames, rows): "OPIO更新" if (not isinstance(tableName, str)) or ( not ((isinstance(colNames, list) or isinstance(colNames, tuple))) or ( not ( ( isinstance(rows, list) or isinstance(rows, tuple) or isinstance(rows, numpy.ndarray) ) ) ) ): raise Exception("update parameter error") return self.__modify(Connect.actionUpdate, tableName, colNames, rows) def select(self, tableName, colNames, keys, options): "OPIO查询" if ( (not isinstance(tableName, str)) or ( not ((isinstance(colNames, list) or isinstance(colNames, tuple))) or (not ((isinstance(keys, list) or isinstance(keys, tuple)))) ) or ((options) and (not isinstance(options, dict))) ): raise Exception("select parameter error") return self.__findOrDelete( Connect.actionSelect, tableName, colNames, keys, options ) def openAsync(self, tableName, cb, keys): "异步订阅" async_request = self.new_request() table = self.new_table(tableName) self.add_column(table, "*", eType.Null, 0, 0, None, None) self.set_table(async_request, table) keyLen = len(keys) if keyLen <= 0: raise Exception("keys num is zero") if (not isinstance(keys, list)) and (not isinstance(keys, tuple)): raise Exception("keys type must be list or tuple") if isinstance(keys[0], str): self.set_indices_string(async_request, "GN", keyLen, keys) elif isinstance(keys[0], int): self.set_indices(async_request, "ID", keyLen, keys) else: raise Exception("key must be ID or GN") self.set_option(async_request, "Snapshot", "1") error = c_int(-1) ah = self.open_async(self.op, async_request, cb, None, error) if ah == None: if async_request != None: self.free_request(async_request) raise Exception("open_async error:" + str(error.value)) return Async(ah) class ResultSet(IO): "结果类" def __init__(self, opio, request, response): self.hasWall = True if self.has_wall(response) == 1 else False self.tableResult = self.get_table(response) self.request = request self.response = response self.opio = opio self.pos = 0 self.rowsNum = 0 self.columnsNum = 0 def __del__(self): self.close() def __nextContent(self): eof = 0 if eof == 0: rv = self.next_content(self.opio, self.tableResult, 1, eof) if rv != 0: self.opio = None self.pos = 0 self.rowsNum = self.row_count(self.tableResult) self.columnsNum = self.column_count(self.tableResult) return eof == 0 and self.pos < self.rowsNum def Next(self): "下移游标一行" if self.hasWall: return False self.pos += 1 if self.pos >= self.rowsNum: if not self.__nextContent(): return False self.set_rowid(self.tableResult, self.pos) return True def isHaveWall(self): return self.hasWall def getString(self, keyOrColumn): "根据列或字段获取字符串" val = None col = 0 if isinstance(keyOrColumn, str): col = self.column_index(self.tableResult, keyOrColumn) elif isinstance(keyOrColumn, int): col = keyOrColumn if col != -1: val = self.column_string(self.tableResult, col) if isinstance(val, bytes): val = bytes.decode(val) return val def getInt(self, keyOrColumn): "根据列或字段获取整型" num = None col = 0 if isinstance(keyOrColumn, str): col = self.column_index(self.tableResult, keyOrColumn) elif isinstance(keyOrColumn, int): col = keyOrColumn if col != -1: num = self.column_int(self.tableResult, col) return num def getFloat(self, keyOrColumn): "根据列或字段获取浮点型" num = None col = 0 if isinstance(keyOrColumn, str): col = self.column_index(self.tableResult, keyOrColumn) elif isinstance(keyOrColumn, int): col = keyOrColumn if col != -1: num = self.column_double(self.tableResult, col) return num def getDateTime(self, keyOrColumn): "根据列或字段获取Datetime结构体" date = None col = 0 if isinstance(keyOrColumn, str): col = self.column_index(self.tableResult, keyOrColumn) elif isinstance(keyOrColumn, int): col = keyOrColumn if col != -1: timestamp = self.column_double(self.tableResult, col) date = datetime.datetime.fromtimestamp(timestamp) return date def getValue(self, keyOrColumn): "根据列或字段获取值,返回值类型为实际类型" value = None col = 0 if isinstance(keyOrColumn, str): col = self.column_index(self.tableResult, keyOrColumn) elif isinstance(keyOrColumn, int): col = keyOrColumn if col != -1: type_ = self.value_type(self.tableResult, col) if ( type_ == Type.vtBool or type_ == Type.vtInt8 or type_ == Type.vtInt16 or type_ == Type.vtInt32 or type_ == Type.vtInt64 ): value = self.column_int(self.tableResult, col) elif type_ == Type.vtFloat or type_ == Type.vtDouble: value = self.column_double(self.tableResult, col) elif type_ == Type.vtDateTime: v = self.column_double(self.tableResult, col) value = datetime.datetime.fromtimestamp(v) elif type_ == Type.vtBinary: value = self.column_binary(self.tableResult, col) else: value = self.column_string(self.tableResult, col) if isinstance(value, bytes): value = bytes.decode(value) return value def columnLabel(self, column): "根据列获取对应字段名" name = None if isinstance(column, int): name = self.column_name(self.tableResult, column) return name def columnCount(self): "获取列个数" return self.column_count(self.tableResult) def rowCount(self): "获取行个数" return self.row_count(self.tableResult) def columnType(self, column): "获取列类型" colType = self.column_type(self.tableResult, column) if colType >= 1 and colType < 5: return eType.Int elif colType == 5: return eType.Long elif colType >= 6 and colType <= 7: return eType.Float else: return colType def valueType(self, column): "获取值类型" valueType = self.value_type(self.tableResult, column) if valueType >= 1 and valueType < 5: return eType.Int elif valueType == 5: return eType.Long elif valueType >= 6 and valueType <= 7: return eType.Float else: return valueType def close(self): "释放内存" if self.request != None: self.free_request(self.request) self.request = None if self.response != None: self.free_response(self.response) self.response = None class Async(IO): "异步订阅类" def __init__(self, ah): self.ah = ah def close(self): "关闭订阅" if self.ah != None: self.close_async(self.ah) self.ah = None def __subscribe(self, keys, onoff): keyLen = len(keys) if keyLen <= 0: raise Exception("keys num is zero") if (not isinstance(keys, list)) and (not isinstance(keys, tuple)): raise Exception("keys type must be list or tuple") if isinstance(keys[0], str): self.async_subscribe_tags(self.ah, keyLen, keys, onoff) elif isinstance(keys[0], int): self.async_subscribe(self.ah, keyLen, keys, onoff) else: raise Exception("key must be ID or GN") def add(self, keys): "动态添加" self.__subscribe(keys, 1) def remove(self, keys): "动态删除" self.__subscribe(keys, 0)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/boyzhang666/data-analysys-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server