Skip to main content
Glama
ocp_tool.py27.9 kB
"""OCP API tool wrapper module""" import logging from typing import Any, Dict, List, Optional from .config import get_config from .ocp_client import OCPClient logger = logging.getLogger(__name__) _ocp_client: Optional[OCPClient] = None def get_ocp_client() -> OCPClient: """Lazily initialize and return the shared OCP client.""" global _ocp_client if _ocp_client is None: config = get_config() _ocp_client = OCPClient( host=config.ocp_url, access_key_id=config.ocp_access_key_id, access_key_secret=config.ocp_access_key_secret, ) return _ocp_client def get_clusters( page: int = 1, size: int = 10, sort: Optional[str] = None, name: Optional[str] = None, status: Optional[str] = None, ) -> Dict[str, Any]: try: # Build query parameters params = {"page": str(page), "size": str(size)} if sort: params["sort"] = sort if name: params["name"] = name if status and len(status) > 0: params["status"] = status logger.debug(f"Query parameters: {params}") result = get_ocp_client().get("/api/v2/ob/clusters", params=params) return result except Exception as e: logger.error(f"Failed to list OceanBase clusters: {e}") raise def get_cluster_zones(cluster_id: int) -> Dict[str, Any]: try: result = get_ocp_client().get(f"/api/v2/ob/clusters/{cluster_id}/zones") return result except Exception as e: logger.error(f"Failed to get zones for cluster {cluster_id}: {e}") raise def get_cluster_servers( cluster_id: int, region_name: Optional[str] = None, idc_name: Optional[str] = None, ) -> Dict[str, Any]: try: params = {} if region_name: params["regionName"] = region_name if idc_name: params["idcName"] = idc_name result = get_ocp_client().get( f"/api/v2/ob/clusters/{cluster_id}/servers", params=params if params else None, ) return result except Exception as e: logger.error(f"Failed to get servers for cluster {cluster_id}: {e}") raise def get_zone_servers( cluster_id: int, zone_name: str, ) -> Dict[str, Any]: try: result = get_ocp_client().get( f"/api/v2/ob/clusters/{cluster_id}/zones/{zone_name}/servers" ) return result except Exception as e: logger.error( f"Failed to get servers for cluster {cluster_id} zone {zone_name}: {e}" ) raise def get_cluster_stats(cluster_id: int) -> Dict[str, Any]: try: result = get_ocp_client().get(f"/api/v2/ob/clusters/{cluster_id}/stats") return result except Exception as e: logger.error(f"Failed to get stats for cluster {cluster_id}: {e}") raise def get_cluster_server_stats(cluster_id: int) -> Dict[str, Any]: try: result = get_ocp_client().get(f"/api/v2/ob/clusters/{cluster_id}/serverStats") return result except Exception as e: logger.error(f"Failed to get server stats for cluster {cluster_id}: {e}") raise def get_cluster_units(cluster_id: int) -> Dict[str, Any]: try: result = get_ocp_client().get(f"/api/v2/ob/clusters/{cluster_id}/units") return result except Exception as e: logger.error(f"Failed to get units for cluster {cluster_id}: {e}") raise def get_cluster_tenants( cluster_id: int, page: int = 1, size: int = 10, sort: Optional[str] = None, name: Optional[str] = None, mode: Optional[str] = None, status: Optional[str] = None, ) -> Dict[str, Any]: try: params = {"page": str(page), "size": str(size)} if sort: params["sort"] = sort if name: params["name"] = name if mode: params["mode"] = mode if status: params["status"] = status result = get_ocp_client().get( f"/api/v2/ob/clusters/{cluster_id}/tenants", params=params ) return result except Exception as e: logger.error(f"Failed to get tenants for cluster {cluster_id}: {e}") raise def get_all_tenants( page: int = 1, size: int = 10, sort: Optional[str] = None, name: Optional[str] = None, mode: Optional[str] = None, status: Optional[str] = None, ) -> Dict[str, Any]: try: params = {"page": str(page), "size": str(size)} if sort: params["sort"] = sort if name: params["name"] = name if mode: params["mode"] = mode if status: params["status"] = status result = get_ocp_client().get("/api/v2/ob/tenants", params=params) return result except Exception as e: logger.error(f"Failed to get all tenants: {e}") raise def get_tenant_detail(cluster_id: int, tenant_id: int) -> Dict[str, Any]: try: result = get_ocp_client().get( f"/api/v2/ob/clusters/{cluster_id}/tenants/{tenant_id}" ) return result except Exception as e: logger.error( f"Failed to get tenant detail for cluster {cluster_id} tenant {tenant_id}: {e}" ) raise def get_tenant_units( cluster_id: int, tenant_id: int, zone_name: Optional[str] = None, ) -> Dict[str, Any]: try: params = {} if zone_name: params["zoneName"] = zone_name result = get_ocp_client().get( f"/api/v2/ob/clusters/{cluster_id}/tenants/{tenant_id}/units", params=params if params else None, ) return result except Exception as e: logger.error( f"Failed to get units for cluster {cluster_id} tenant {tenant_id}: {e}" ) raise def get_tenant_parameters(cluster_id: int, tenant_id: int) -> Dict[str, Any]: try: result = get_ocp_client().get( f"/api/v2/ob/clusters/{cluster_id}/tenants/{tenant_id}/parameters" ) return result except Exception as e: logger.error( f"Failed to get parameters for cluster {cluster_id} tenant {tenant_id}: {e}" ) raise def get_cluster_parameters(cluster_id: int) -> Dict[str, Any]: try: result = get_ocp_client().get(f"/api/v2/ob/clusters/{cluster_id}/parameters") return result except Exception as e: logger.error(f"Failed to get parameters for cluster {cluster_id}: {e}") raise def set_tenant_parameters( cluster_id: int, tenant_id: int, parameters: List[Dict[str, Any]], ) -> Dict[str, Any]: if not parameters: raise ValueError("parameters list cannot be empty") payload: List[Dict[str, Any]] = [] for index, param in enumerate(parameters): if "name" not in param or not param["name"]: raise ValueError(f"parameters[{index}] is missing required field 'name'") if "value" not in param: raise ValueError(f"parameters[{index}] is missing required field 'value'") if "parameterType" not in param or not param["parameterType"]: raise ValueError( f"parameters[{index}] is missing required field 'parameterType'" ) payload.append( { "name": str(param["name"]), "value": param["value"], "parameterType": str(param["parameterType"]), } ) try: result = get_ocp_client().put( f"/api/v2/ob/clusters/{cluster_id}/tenants/{tenant_id}/parameters", json=payload, ) return result except Exception as e: logger.error( f"Failed to set parameters for cluster {cluster_id} tenant {tenant_id}: {e}" ) raise def set_cluster_parameters( cluster_id: int, parameters: List[Dict[str, Any]], ) -> Dict[str, Any]: if not parameters: raise ValueError("parameters list cannot be empty") payload: List[Dict[str, Any]] = [] for index, param in enumerate(parameters): if "name" not in param or not param["name"]: raise ValueError(f"parameters[{index}] is missing required field 'name'") if "value" not in param: raise ValueError(f"parameters[{index}] is missing required field 'value'") payload.append( { "name": str(param["name"]), "value": param["value"], } ) try: result = get_ocp_client().put( f"/api/v2/ob/clusters/{cluster_id}/parameters", json=payload, ) return result except Exception as e: logger.error(f"Failed to set parameters for cluster {cluster_id}: {e}") raise def get_obproxy_clusters( page: int = 1, size: int = 10, ) -> Dict[str, Any]: try: params = {"page": str(page), "size": str(size)} result = get_ocp_client().get("/api/v2/obproxy/clusters", params=params) return result except Exception as e: logger.error(f"Failed to get OBProxy clusters: {e}") raise def get_obproxy_cluster_detail(cluster_id: int) -> Dict[str, Any]: try: result = get_ocp_client().get(f"/api/v2/obproxy/clusters/{cluster_id}") return result except Exception as e: logger.error( f"Failed to get OBProxy cluster detail for cluster {cluster_id}: {e}" ) raise def get_obproxy_cluster_parameters(cluster_id: int) -> Dict[str, Any]: try: result = get_ocp_client().get( f"/api/v2/obproxy/clusters/{cluster_id}/parameters" ) return result except Exception as e: logger.error(f"Failed to get parameters for OBProxy cluster {cluster_id}: {e}") raise def get_tenant_databases(cluster_id: int, tenant_id: int) -> Dict[str, Any]: try: result = get_ocp_client().get( f"/api/v2/ob/clusters/{cluster_id}/tenants/{tenant_id}/databases" ) return result except Exception as e: logger.error( f"Failed to get databases for cluster {cluster_id} tenant {tenant_id}: {e}" ) raise def get_tenant_users(cluster_id: int, tenant_id: int) -> Dict[str, Any]: try: result = get_ocp_client().get( f"/api/v2/ob/clusters/{cluster_id}/tenants/{tenant_id}/users" ) return result except Exception as e: logger.error( f"Failed to get users for cluster {cluster_id} tenant {tenant_id}: {e}" ) raise def get_tenant_user_detail( cluster_id: int, tenant_id: int, username: str, host_name: Optional[str] = None, ) -> Dict[str, Any]: try: params = {} if host_name: params["hostName"] = host_name result = get_ocp_client().get( f"/api/v2/ob/clusters/{cluster_id}/tenants/{tenant_id}/users/{username}", params=params if params else None, ) return result except Exception as e: logger.error( f"Failed to get user detail for cluster {cluster_id} tenant {tenant_id} user {username}: {e}" ) raise def get_tenant_roles(cluster_id: int, tenant_id: int) -> Dict[str, Any]: try: result = get_ocp_client().get( f"/api/v2/ob/clusters/{cluster_id}/tenants/{tenant_id}/roles" ) return result except Exception as e: logger.error( f"Failed to get roles for cluster {cluster_id} tenant {tenant_id}: {e}" ) raise def get_tenant_role_detail( cluster_id: int, tenant_id: int, role_name: str ) -> Dict[str, Any]: try: result = get_ocp_client().get( f"/api/v2/ob/clusters/{cluster_id}/tenants/{tenant_id}/roles/{role_name}" ) return result except Exception as e: logger.error( f"Failed to get role detail for cluster {cluster_id} tenant {tenant_id} role {role_name}: {e}" ) raise def get_tenant_objects(cluster_id: int, tenant_id: int) -> Dict[str, Any]: try: result = get_ocp_client().get( f"/api/v2/ob/clusters/{cluster_id}/tenants/{tenant_id}/objects" ) return result except Exception as e: logger.error( f"Failed to get objects for cluster {cluster_id} tenant {tenant_id}: {e}" ) raise def get_metric_groups( type: str, scope: str, page: int = 1, size: int = 10, sort: Optional[str] = None, target: Optional[str] = None, target_id: int = None, ) -> Dict[str, Any]: try: params = {"type": type, "scope": scope, "page": str(page), "size": str(size)} if sort: params["sort"] = sort if target: params["target"] = target if target_id is not None: params["targetId"] = target_id result = get_ocp_client().get("/api/v2/monitor/metricGroups", params=params) return result except Exception as e: logger.error(f"Failed to get metric groups: {e}") raise def get_metric_data_with_label( start_time: str, end_time: str, metrics: str, group_by: str, interval: int, labels: str, min_step: int = None, max_points: int = None, ) -> Dict[str, Any]: try: params = { "startTime": start_time, "endTime": end_time, "metrics": metrics, "groupBy": group_by, "interval": str(interval), "labels": labels, } if min_step is not None: params["minStep"] = min_step if max_points is not None: params["maxPoints"] = max_points result = get_ocp_client().get("/api/v2/monitor/metricsWithLabel", params=params) return result except Exception as e: logger.error(f"Failed to get metric data with label: {e}") raise def get_alarms( page: int = 1, size: int = 10, app_type: Optional[str] = None, scope: Optional[str] = None, level: int = None, status: Optional[str] = None, active_at_start: str = None, active_at_end: str = None, is_subscribed_by_me: bool = None, keyword: Optional[str] = None, ) -> Dict[str, Any]: try: params = {"page": str(page), "size": str(size)} if app_type: params["appType"] = app_type if scope: params["scope"] = scope if level is not None: params["level"] = level if status: params["status"] = status if active_at_start: params["activeAtStart"] = active_at_start if active_at_end: params["activeAtEnd"] = active_at_end if is_subscribed_by_me is not None: params["isSubscribedByMe"] = str(is_subscribed_by_me).lower() if keyword: params["keyword"] = keyword result = get_ocp_client().get("/api/v2/alarm/alarms", params=params) return result except Exception as e: logger.error(f"Failed to get alarms: {e}") raise def get_alarm_detail(alarm_id: int) -> Dict[str, Any]: try: result = get_ocp_client().get(f"/api/v2/alarm/alarms/{alarm_id}") return result except Exception as e: logger.error(f"Failed to get alarm {alarm_id}: {e}") raise def get_inspection_tasks( inspectionObjectTypes: Optional[str] = None, tags: Optional[str] = None, taskStates: Optional[str] = None, name: Optional[str] = None, ) -> Dict[str, Any]: try: params = {} if inspectionObjectTypes: params["inspectionObjectTypes"] = inspectionObjectTypes if tags: params["tags"] = tags if taskStates: params["taskStates"] = taskStates if name: params["name"] = name result = get_ocp_client().get( "/api/v2/inspection/task", params=params if params else None ) return result except Exception as e: logger.error(f"Failed to get inspection tasks: {e}") raise def get_inspection_overview( object_ids: str = None, inspection_object_type: Optional[str] = None, schedule_states: Optional[str] = None, name: Optional[str] = None, parent_name: Optional[str] = None, ) -> Dict[str, Any]: try: params = {} if object_ids: params["objectIds"] = object_ids if inspection_object_type: params["inspectionObjectType"] = inspection_object_type if schedule_states: params["scheduleStates"] = schedule_states if name: params["name"] = name if parent_name: params["parentName"] = parent_name result = get_ocp_client().get( "/api/v2/inspection/overview", params=params if params else None ) return result except Exception as e: logger.error(f"Failed to get inspection overview: {e}") raise def get_inspection_report(report_id: int) -> Dict[str, Any]: try: result = get_ocp_client().get(f"/api/v2/inspection/report/{report_id}") return result except Exception as e: logger.error(f"Failed to get inspection report {report_id}: {e}") raise def run_inspection( inspection_object_type: str, object_ids: str, tag: int, ) -> Dict[str, Any]: if not inspection_object_type: raise ValueError("inspection_object_type is required") if not object_ids: raise ValueError("object_ids list cannot be empty") if not tag: raise ValueError("tags is required") # Validate inspection_object_type valid_types = ["OB_CLUSTER", "OB_TENANT", "HOST", "OB_PROXY"] if inspection_object_type not in valid_types: raise ValueError(f"inspection_object_type must be one of {valid_types}") # Validate tags valid_tags = [1, 2, 3, 4] if tag not in valid_tags: raise ValueError(f"tag must be one of {valid_tags}") try: params = {} if inspection_object_type is not None: params["inspectionObjectType"] = inspection_object_type if object_ids is not None: params["objectIds"] = object_ids if tag is not None: params["tag"] = tag result = get_ocp_client().post( "/api/v2/inspection/run", params=params if params else None ) return result except Exception as e: logger.error(f"Failed to run inspection: {e}") raise def get_inspection_item_last_result( item_id: int, tag_id: int, object_type: str, object_id: Optional[int] = None, ) -> Dict[str, Any]: if not item_id: raise ValueError("item_id is required") if not tag_id: raise ValueError("tag_id is required") if not object_type: raise ValueError("object_type is required") # Validate tag_id valid_tags = [1, 2, 3, 4] if tag_id not in valid_tags: raise ValueError(f"tag_id must be one of {valid_tags}") # Validate object_type valid_types = ["OB_CLUSTER", "OB_TENANT", "HOST", "OB_PROXY"] if object_type not in valid_types: raise ValueError(f"object_type must be one of {valid_types}") params = { "tagId": str(tag_id), "objectType": object_type, } if object_id is not None: params["objectId"] = str(object_id) try: result = get_ocp_client().get( f"/api/v2/inspection/report/info/item/{item_id}", params=params ) return result except Exception as e: logger.error( f"Failed to get inspection item last result for item {item_id}: {e}" ) raise def get_inspection_report_info( tag_id: int, object_type: str, object_id: Optional[int] = None, ) -> Dict[str, Any]: if not tag_id: raise ValueError("tag_id is required") if not object_type: raise ValueError("object_type is required") # Validate tag_id valid_tags = [1, 2, 3, 4] if tag_id not in valid_tags: raise ValueError(f"tag_id must be one of {valid_tags}") # Validate object_type valid_types = ["OB_CLUSTER", "OB_TENANT", "HOST", "OB_PROXY"] if object_type not in valid_types: raise ValueError(f"object_type must be one of {valid_types}") params = { "tagId": str(tag_id), "objectType": object_type, } if object_id is not None: params["objectId"] = str(object_id) try: result = get_ocp_client().get("/api/v2/inspection/report/info", params=params) return result except Exception as e: logger.error(f"Failed to get inspection report info: {e}") raise def get_tenant_top_sql( cluster_id: int, tenant_id: int, start_time: str, end_time: str, inner: bool = None, server_id: int = None, sql_text: Optional[str] = None, search_attr: Optional[str] = None, search_op: Optional[str] = None, search_val: Optional[str] = None, ) -> Dict[str, Any]: try: params = { "startTime": start_time, "endTime": end_time, } if server_id is not None: params["serverId"] = str(server_id) if inner is not None: params["inner"] = str(inner).lower() if sql_text: params["sqlText"] = sql_text if search_attr: params["searchAttr"] = search_attr if search_op: params["searchOp"] = search_op if search_val: params["searchVal"] = search_val result = get_ocp_client().get( f"/api/v2/ob/clusters/{cluster_id}/tenants/{tenant_id}/topSql", params=params, ) return result except Exception as e: logger.error(f"Failed to get tenant top SQL: {e}") raise def get_sql_text( cluster_id: int, tenant_id: int, sql_id: str, start_time: str, end_time: str, db_name: Optional[str] = None, ) -> Dict[str, Any]: try: params = { "startTime": start_time, "endTime": end_time, } if db_name is not None: params["dbName"] = db_name result = get_ocp_client().get( f"/api/v2/ob/clusters/{cluster_id}/tenants/{tenant_id}/sqls/{sql_id}/text", params=params, ) return result except Exception as e: logger.error(f"Failed to get SQL text: {e}") raise def get_tenant_slow_sql( cluster_id: int, tenant_id: int, start_time: str, end_time: str, server_id: Optional[int] = None, inner: Optional[bool] = None, sql_text: Optional[str] = None, filter_expression: Optional[str] = None, limit: Optional[int] = None, sql_text_length: Optional[int] = None, ) -> Dict[str, Any]: try: params = { "startTime": start_time, "endTime": end_time, } if server_id is not None: params["serverId"] = str(server_id) if inner is not None: params["inner"] = str(inner).lower() if sql_text: params["sqlText"] = sql_text if filter_expression: params["filterExpression"] = filter_expression if limit is not None: params["limit"] = str(limit) if sql_text_length is not None: params["sqlTextLength"] = str(sql_text_length) result = get_ocp_client().get( f"/api/v2/ob/clusters/{cluster_id}/tenants/{tenant_id}/slowSql", params=params, ) return result except Exception as e: logger.error(f"Failed to get tenant slow SQL: {e}") raise def create_performance_report( cluster_id: int, start_snapshot_id: int, end_snapshot_id: int, name: str, ) -> Dict[str, Any]: try: params = {} if name: params["name"] = name if start_snapshot_id is not None: params["startSnapshotId"] = str(start_snapshot_id) if end_snapshot_id is not None: params["endSnapshotId"] = str(end_snapshot_id) result = get_ocp_client().post( f"/api/v2/ob/clusters/{cluster_id}/performance/workload/reports", params=params if params else None, ) return result except Exception as e: logger.error(f"Failed to create performance report: {e}") raise def get_cluster_snapshots(cluster_id: int) -> Dict[str, Any]: """ Query cluster snapshot information This interface is used to query snapshot information of a specified cluster. The caller must have read and write permissions for the specified cluster. Args: cluster_id: The ID of the target OceanBase cluster Returns: Dictionary containing snapshot list with snapshotId and snapshotTime """ try: result = get_ocp_client().get( f"/api/v2/ob/clusters/{cluster_id}/performance/workload/snapshots" ) return result except Exception as e: logger.error(f"Failed to get snapshots for cluster {cluster_id}: {e}") raise def get_performance_report( cluster_id: int, report_id: int, directory: Optional[str] = None, ) -> Dict[str, Any]: """ Query performance report This interface is used to query cluster performance report. The caller must have read and write permissions for the specified cluster. Note: This endpoint returns binary HTML content that can be saved as an HTML file. Args: cluster_id: The ID of the target OceanBase cluster report_id: Performance report ID directory: The directory where the HTML report is saved (required), and the absolute path is not empty Returns: Binary HTML content """ try: import base64 # Add query parameters as required by the API (even though they're in the path) params = { "id": str(cluster_id), "reportId": str(report_id), } # Set Accept header to accept any content type (like browser does) headers = { "Accept": "*/*", } binary_content = get_ocp_client().get_binary( f"/api/v2/ob/clusters/{cluster_id}/performance/workload/reports/{report_id}", params=params, headers=headers, ) # Encode binary content as base64 for JSON serialization content_base64 = base64.b64encode(binary_content).decode("utf-8") if content_base64: html_content = base64.b64decode(content_base64) output_file = ( f"{directory}/performance_report_{cluster_id}_{report_id}.html" ) with open(output_file, "wb") as f: f.write(html_content) print(f"✓ HTML report saved to: {output_file}") if output_file: return { "success": True, "cluster_id": cluster_id, "report_id": report_id, "output_file": output_file, "message": f"HTML report saved successfully to {output_file}", } else: return { "success": False, "cluster_id": cluster_id, "report_id": report_id, "message": f"Failed to get performance report {report_id} for cluster {cluster_id}", } except Exception as e: logger.error( f"Failed to get performance report {report_id} for cluster {cluster_id}: {e}" ) raise

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/oceanbase/mcp-oceanbase'

If you have feedback or need assistance with the MCP directory API, please join our Discord server