Skip to main content
Glama

mcp-oceanbase

Official
by oceanbase
server.py11.4 kB
from fastmcp import FastMCP from obshell import ClientSet from obshell.auth import PasswordAuth from obshell.model.tenant import ZoneParam from typing import Dict, Any import os import time import sys mcp = FastMCP("obshell-mcp") client = None SYS_PASSWORD = os.getenv("SYS_PASSWORD", "password") OBSHELL_HOST = os.getenv("OBSHELL_HOST", "127.0.0.1") OBSHELL_PORT = os.getenv("OBSHELL_PORT", "2886") OBSHELL_PORT = int(OBSHELL_PORT) CLUSTER_NAME = os.getenv("CLUSTER_NAME", "cluster") TENANT_NAME = os.getenv("TENANT_NAME", "tenant") create_cluster_desc = f"""Creates a new obcluster.The cluster name has been offered by user: {CLUSTER_NAME} 创建一个 ob 集群。该方法的响应时间有点长,最长不超过5分钟,当该方法中断或者超时的时候,后续应该先检查集群是否创建成功。 如果节点身份是 CLUSTER AGENT,则表示集群已经创建成功。 使用该方法时,应该让用户能够对参数进行修改。 当用户没有指定对应参数时: 1. 只启用本地127.0.0.1:2886上的节点, 不用启动其他多余的节点 2. servers_with_configs 默认使用 {{'127.0.0.1:2886': {{"zone": "zone1", "datafile_size": "8G", "cpu_count": "8", "memory_limit": "7G", "system_memory": "1G", "log_disk_size": "24G", "enable_syslog_recycle": "true", "enable_syslog_wf": "true", "__min_full_resource_pool_memory: "1073741824"}}}} 注意参数都使用字符串类型。 Creates a new obcluster with the specified servers and configurations. Args: servers_with_configs (list): The dict of the server and its configurations. The configuration should include the zone of the server. Example: {{'127.0.0.1:2886': {{"zone": "zone1", "datafile_size": "32G", "cpu_count": "8", "memory_limit": "7G", "system_memory": "1G", "log_disk_size": "32G", "enable_syslog_recycle": "true", "enable_syslog_wf": "true", "__min_full_resource_pool_memory: 1073741824}}}} cluster_id (int): The id of the obcluster. Returns: Task detail as task.DagDetailDTO. Raises: OBShellHandleError: error message return by OBShell server. TaskExecuteFailedError: raise when the task failed, include the failed task detail and logs. IllegalOperatorError: raise when the operator is illegal. """ create_tenant_desc = f"""Create a tenant synchronously.The tenant name has been offered by user: {TENANT_NAME} Creates a tenant with the specified name and zone list. 创建业务租户时,需要指定所使用的 unit config。 当用户没有指定对应参数时: 1. 推荐使用参数:{{ memory_size: 2G; cpu_count: 1; unit_num: 1; "zone_replica_type": {{"zone1":"FULL"}}}}, 其他参数可以随意填写 2. variables参数和parameters参数留空 Args: zone_replica_type (Dict[str, str]): The map of the zone and its replica type(FULL or READONLY), default is FULL. memory_size (str, optional): The memory size when create a new unit config. cpu_count (int, optional): The cpu count when create a new unit config. unit_num (int, optional): The unit number when create a new unit config. log_disk_size (str, optional): The log disk size when create a new unit config. mode (str, optional): The mode of the tenant, "MYSQL" or "ORACLE". Defaults to 'MYSQL'. primary_zone (str, optional): The primary zone of the tenant. Defaults to "RANDOM". whitelist (str, optional): The whitelist of the tenant. Defaults to "%". scenario: The scenario of the tenant. Can be one of 'express_oltp', 'complex_oltp', 'olap', 'htap', 'kv'. Defaults to 'oltp'. root_password (str, optional): The root password of the tenant. Defaults to Empty. import_script (bool, optional): Whether need to import the observer's script. Defaults to False. Support from OBShell V4.2.4.2. charset (str, optional): The charset of the tenant. If not set, the charset will be set to default value by observer. collation (str, optional): The collation of the tenant. If not set, the collation will be set to default value by observer. read_only (bool, optional): Whether the tenant is read only. Defaults to False. comment (str, optional): The comment of the tenant. Returns: Task detail as task.DagDetailDTO. Raises: OBShellHandleError: error message return by OBShell server. TaskExecuteFailedError: raise when the task failed, include the failed task detail and logs. """ # @mcp.tool() def connect( host: str = OBSHELL_HOST, port: int = OBSHELL_PORT, password: str = SYS_PASSWORD, timeout: int = 600, ): """连接 obshell 服务 参数: host: obshell 服务地址, 默认为 "127.0.0.1" port: obshell 服务端口, 默认为 "2886" password: sys 租户的 root 密码, 默认为 timeout: 时间戳超时时间,单位为秒,默认 600 秒 返回: client: obshell 客户端实例,后续所有 obshell 的 sdk 方法,都需要使用该实例 当 client 连接成功后,会自动创建一个 obshell 的 session,后续所有 obshell 的 sdk 方法,都需要使用该 session。 当该 session 使用出错时,需要重新调用 connect 方法,重新创建一个 session。 """ global client client = ClientSet(host, port, auth=PasswordAuth(password), timeout=timeout) try: client.v1.get_ob_info() except Exception as e: raise Exception("连接 obshell 服务失败: " + str(e)) return client.v1.get_status() @mcp.tool(description=create_cluster_desc) def create_cluster(servers_with_configs: dict, cluster_id: int): root_pwd = SYS_PASSWORD global client if client is None: connect() try: client.v1.agg_create_cluster( servers_with_configs, CLUSTER_NAME, cluster_id, root_pwd, clear_if_failed=True, ) except Exception as e: raise Exception("创建 ob 集群失败: " + str(e)) client.v1._reset_auth() return "success!" @mcp.tool(description=create_tenant_desc) def create_tenant( zone_replica_type: Dict[str, str], memory_size: str = "2G", cpu_count: int = 1, unit_num: int = 1, log_disk_size: str = "", mode: str = "MYSQL", primary_zone: str = "RANDOM", whitelist: str = "%", scenario: str = None, import_script: bool = False, charset: str = None, collation: str = None, read_only: bool = False, comment: str = None, variables: dict = None, parameters: dict = None, ): # Global variable `secure_file_priv` take a while to take effect, so we set it early (right after tenant creation) # if not variables: # variables = {} # variables["secure_file_priv"] = "/" root_password = SYS_PASSWORD global client if client is None: connect() # 创建一个新的 unit config unit_config_name = f"{TENANT_NAME}_unit_config_{int(time.time())}" client.v1.create_resource_unit_config( unit_config_name, memory_size, cpu_count, log_disk_size=(None if log_disk_size == "" else log_disk_size), ) zone_list = [ ZoneParam(zone, unit_config_name, unit_num, zone_replica_type.get(zone, "FULL")) for zone in zone_replica_type ] return client.v1.create_tenant_sync( TENANT_NAME, zone_list, mode, primary_zone, whitelist, root_password, scenario, import_script, charset, collation, read_only, comment, variables, parameters, ) @mcp.tool() def get_all_obshell_sdk_methods(): """获取 obshell 所有可供用户使用的 sdk 方法 只有当其他的 tool 不适用时,才通过该方法获取所有 sdk 方法。 """ global client if client is None: connect() methods = {} for method in dir(client.v1): if not method.startswith("_") and not method.startswith("agg"): methed = getattr(client.v1, method) if callable(methed): methods[method] = methed.__doc__ return methods @mcp.tool() def get_obshell_sdk_methods_description(sdk_method: str): """获取 obshell 的 sdk 方法的描述""" global client if client is None: connect() methed = getattr(client.v1, sdk_method) if not callable(methed): raise Exception("sdk 方法不存在: " + sdk_method) return methed.__doc__ @mcp.tool() def call_obshell_sdk(sdk_method: str, args: Dict[str, Any]): """调用 obshell 的 sdk 方法 参数: sdk_method: obshell 的 sdk 方法名,请根据 sdk 的描述文档,选择一个合适的 sdk 方法。 args: obshell 的 sdk 方法参数,参数名和参数值需要使用 map 格式,例如: {"arg1": "value1", "arg2": "value2"} 如果有任何必选参数没有设置,都直接返回错误信息,不要尝试调用 obshell 的 sdk 方法。 返回: result: obshell 的 sdk 方法返回结果 """ if client is None: connect() print("调用 obshell 的 sdk 方法: " + sdk_method) # 转换特殊参数格式 processed_args = args.copy() # 如果有 zone_list 参数,转换为 ZoneParam 对象列表 if "zone_list" in processed_args and isinstance(processed_args["zone_list"], list): zone_objects = [] for zone_data in processed_args["zone_list"]: if isinstance(zone_data, dict): # 创建 ZoneParam 对象 zone_name = zone_data.get("zone") unit_config_name = zone_data.get( "unit_config", zone_data.get("unit_config_name") ) unit_num = zone_data.get("unit_num", 1) replica_type = zone_data.get("replica_type") if zone_name and unit_config_name: zone_param = ZoneParam( zone_name, unit_config_name, unit_num, replica_type ) zone_objects.append(zone_param) else: raise Exception( "zone_list 参数格式错误:缺少必要字段 zone 或 unit_config/unit_config_name" ) else: zone_objects.append(zone_data) # 已经是对象 processed_args["zone_list"] = zone_objects try: return getattr(client.v1, sdk_method)(**processed_args) except Exception as e: raise Exception("调用 obshell 的 sdk 方法失败: " + str(e)) def main(): SSE = False SSE_PORT = 8000 if len(sys.argv) > 1 and sys.argv[1] == "--sse": SSE = True if len(sys.argv) > 2: SSE_PORT = int(sys.argv[2]) else: SSE_PORT = 8000 if SSE: mcp.run(transport="sse", host="0.0.0.0", port=SSE_PORT, path="/obshell") else: mcp.run() if __name__ == "__main__": main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/oceanbase/mcp-oceanbase'

If you have feedback or need assistance with the MCP directory API, please join our Discord server