Skip to main content
Glama

Alibaba Cloud Operations MCP Server

by RadiumGu
oos_tools.py10.1 kB
from pydantic import Field from typing import List import os import json import time from alibabacloud_oos20190601.client import Client as oos20190601Client from alibabacloud_oos20190601 import models as oos_20190601_models from alibaba_cloud_ops_mcp_server.alibabacloud.utils import create_config from alibaba_cloud_ops_mcp_server.alibabacloud import exception END_STATUSES = [SUCCESS, FAILED, CANCELLED] = ['Success', 'Failed', 'Cancelled'] tools = [] def create_client(region_id: str) -> oos20190601Client: config = create_config() config.endpoint = f'oos.{region_id}.aliyuncs.com' return oos20190601Client(config) def _start_execution_sync(region_id: str, template_name: str, parameters: dict): client = create_client(region_id=region_id) start_execution_request = oos_20190601_models.StartExecutionRequest( region_id=region_id, template_name=template_name, parameters=json.dumps(parameters) ) start_execution_resp = client.start_execution(start_execution_request) execution_id = start_execution_resp.body.execution.execution_id while True: list_executions_request = oos_20190601_models.ListExecutionsRequest( region_id=region_id, execution_id=execution_id ) list_executions_resp = client.list_executions(list_executions_request) status = list_executions_resp.body.executions[0].status if status == FAILED: status_message = list_executions_resp.body.executions[0].status_message raise exception.OOSExecutionFailed(reason=status_message) elif status in END_STATUSES: return list_executions_resp.body time.sleep(1) @tools.append def OOS_RunCommand( Command: str = Field(description='Content of the command executed on the ECS instance'), InstanceIds: List[str] = Field(description='AlibabaCloud ECS instance ID List'), RegionId: str = Field(description='AlibabaCloud region ID', default='cn-hangzhou'), CommandType: str = Field(description='The type of command executed on the ECS instance, optional value:RunShellScript,RunPythonScript,RunPerlScript,RunBatScript,RunPowerShellScript', default='RunShellScript') ): """批量在多台ECS实例上运行云助手命令,适用于需要同时管理多台ECS实例的场景,如应用程序管理和资源标记操作等。""" parameters = { 'regionId': RegionId, 'resourceType': 'ALIYUN::ECS::Instance', 'targets': { 'ResourceIds': InstanceIds, 'RegionId': RegionId, 'Type': 'ResourceIds', 'Parameters': { 'RegionId': RegionId, 'Status': 'Running' } }, "commandType": CommandType, "commandContent": Command } return _start_execution_sync(region_id=RegionId, template_name='ACS-ECS-BulkyRunCommand', parameters=parameters) @tools.append def OOS_StartInstances( InstanceIds: List[str] = Field(description='AlibabaCloud ECS instance ID List'), RegionId: str = Field(description='AlibabaCloud region ID', default='cn-hangzhou'), ): """批量启动ECS实例,适用于需要同时管理和启动多台ECS实例的场景,例如应用部署和高可用性场景。""" parameters = { 'regionId': RegionId, 'resourceType': 'ALIYUN::ECS::Instance', 'targets': { 'ResourceIds': InstanceIds, 'RegionId': RegionId, 'Type': 'ResourceIds' } } return _start_execution_sync(region_id=RegionId, template_name='ACS-ECS-BulkyStartInstances', parameters=parameters) @tools.append def OOS_StopInstances( InstanceIds: List[str] = Field(description='AlibabaCloud ECS instance ID List'), RegionId: str = Field(description='AlibabaCloud region ID', default='cn-hangzhou'), ForeceStop: bool = Field(description='Is forced shutdown required', default=False) ): """批量停止ECS实例,适用于需要同时管理和停止多台ECS实例的场景。""" parameters = { 'regionId': RegionId, 'resourceType': 'ALIYUN::ECS::Instance', 'targets': { 'ResourceIds': InstanceIds, 'RegionId': RegionId, 'Type': 'ResourceIds' }, 'forceStop': ForeceStop } return _start_execution_sync(region_id=RegionId, template_name='ACS-ECS-BulkyStopInstances', parameters=parameters) @tools.append def OOS_RebootInstances( InstanceIds: List[str] = Field(description='AlibabaCloud ECS instance ID List'), RegionId: str = Field(description='AlibabaCloud region ID', default='cn-hangzhou'), ForeceStop: bool = Field(description='Is forced shutdown required', default=False) ): """批量重启ECS实例,适用于需要同时管理和重启多台ECS实例的场景。""" parameters = { 'regionId': RegionId, 'resourceType': 'ALIYUN::ECS::Instance', 'targets': { 'ResourceIds': InstanceIds, 'RegionId': RegionId, 'Type': 'ResourceIds' }, 'forceStop': ForeceStop } return _start_execution_sync(region_id=RegionId, template_name='ACS-ECS-BulkyRebootInstances', parameters=parameters) @tools.append def OOS_RunInstances( ImageId: str = Field(description='Image ID'), InstanceType: str = Field(description='Instance Type'), SecurityGroupId: str = Field(description='SecurityGroup ID'), VSwitchId: str = Field(description='VSwitch ID'), RegionId: str = Field(description='AlibabaCloud region ID', default='cn-hangzhou'), Amount: int = Field(description='Number of ECS instances', default=1), InstanceName: str = Field(description='Instance Name', default='') ): """批量创建ECS实例,适用于需要同时创建多台ECS实例的场景,例如应用部署和高可用性场景。""" parameters = { 'imageId': ImageId, 'instanceType': InstanceType, 'securityGroupId': SecurityGroupId, 'vSwitchId': VSwitchId, 'amount': Amount, 'instanceName': InstanceName } return _start_execution_sync(region_id=RegionId, template_name='ACS-ECS-RunInstances', parameters=parameters) @tools.append def OOS_ResetPassword( InstanceIds: List[str] = Field(description='AlibabaCloud ECS instance ID List'), Password: str = Field(description='The password of the ECS instance must be 8-30 characters and must contain only the following characters: lowercase letters, uppercase letters, numbers, and special characters only.()~!@#$%^&*-_+=(40:<>,?/'), RegionId: str = Field(description='AlibabaCloud region ID', default='cn-hangzhou'), ): """批量修改ECS实例的密码,请注意,本操作将会重启ECS实例""" parameters = { 'regionId': RegionId, 'resourceType': 'ALIYUN::ECS::Instance', 'targets': { 'ResourceIds': InstanceIds, 'RegionId': RegionId, 'Type': 'ResourceIds' }, 'password': Password } return _start_execution_sync(region_id=RegionId, template_name='ACS-ECS-BulkyResetPassword', parameters=parameters) @tools.append def OOS_ReplaceSystemDisk( InstanceIds: List[str] = Field(description='AlibabaCloud ECS instance ID List'), ImageId: str = Field(description='Image ID'), RegionId: str = Field(description='AlibabaCloud region ID', default='cn-hangzhou') ): """批量替换ECS实例的系统盘,更换操作系统""" parameters = { 'regionId': RegionId, 'resourceType': 'ALIYUN::ECS::Instance', 'targets': { 'ResourceIds': InstanceIds, 'RegionId': RegionId, 'Type': 'ResourceIds' }, 'imageId': ImageId } return _start_execution_sync(region_id=RegionId, template_name='ACS-ECS-BulkyReplaceSystemDisk', parameters=parameters) @tools.append def OOS_StartRDSInstances( InstanceIds: List[str] = Field(description='AlibabaCloud ECS instance ID List'), RegionId: str = Field(description='AlibabaCloud region ID', default='cn-hangzhou') ): """批量启动RDS实例,适用于需要同时管理和启动多台RDS实例的场景,例如应用部署和高可用性场景。""" parameters = { 'regionId': RegionId, 'resourceType': 'ALIYUN::RDS::Instance', 'targets': { 'ResourceIds': InstanceIds, 'RegionId': RegionId, 'Type': 'ResourceIds' } } return _start_execution_sync(region_id=RegionId, template_name='ACS-RDS-BulkyStartInstances', parameters=parameters) @tools.append def OOS_StopRDSInstances( InstanceIds: List[str] = Field(description='AlibabaCloud RDS instance ID List'), RegionId: str = Field(description='AlibabaCloud region ID', default='cn-hangzhou') ): """批量停止RDS实例,适用于需要同时管理和停止多台RDS实例的场景。""" parameters = { 'regionId': RegionId, 'resourceType': 'ALIYUN::RDS::Instance', 'targets': { 'ResourceIds': InstanceIds, 'RegionId': RegionId, 'Type': 'ResourceIds' } } return _start_execution_sync(region_id=RegionId, template_name='ACS-RDS-BulkyStopInstances', parameters=parameters) @tools.append def OOS_RebootRDSInstances( InstanceIds: List[str] = Field(description='AlibabaCloud RDS instance ID List'), RegionId: str = Field(description='AlibabaCloud region ID', default='cn-hangzhou') ): """批量重启RDS实例,适用于需要同时管理和重启多台RDS实例的场景。""" parameters = { 'regionId': RegionId, 'resourceType': 'ALIYUN::RDS::Instance', 'targets': { 'ResourceIds': InstanceIds, 'RegionId': RegionId, 'Type': 'ResourceIds' } } return _start_execution_sync(region_id=RegionId, template_name='ACS-RDS-BulkyRestartInstances', parameters=parameters)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/RadiumGu/alicloud-ops-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server