Skip to main content
Glama
aliyun

Alibaba Cloud RDS OpenAPI MCP Server

Official
by aliyun

query_sql

Execute read-only SQL queries on Alibaba Cloud RDS databases to retrieve data or show database information using instance ID, region, and database name.

Instructions

execute read-only sql likes show xxx, select xxx Args: dbinstance_id (str): The ID of the RDS instance. region_id(str): the region id of instance. db_name(str): the db name for execute sql. sql(str): the sql to be executed. Returns: the sql result.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
region_idYes
dbinstance_idYes
db_nameYes
sqlYes

Implementation Reference

  • The core handler function for the 'query_sql' MCP tool. It creates a temporary read-only database account if needed, connects to the specified RDS instance and database, executes the provided read-only SQL query, and returns the results as JSON.
    async def query_sql( region_id: str, dbinstance_id: str, db_name: str, sql: str ) -> str: """ execute read-only sql likes show xxx, select xxx Args: dbinstance_id (str): The ID of the RDS instance. region_id(str): the region id of instance. db_name(str): the db name for execute sql. sql(str): the sql to be executed. Returns: the sql result. """ try: async with DBService(region_id, dbinstance_id, db_name) as service: return await service.execute_sql(sql=sql) except Exception as e: logger.error(f"Error occurred: {str(e)}") raise e
  • The DBConn.execute_sql method, which performs the actual SQL execution and formats results as JSON for the query_sql tool.
    def execute_sql(self, sql): cursor = self.conn.cursor() cursor.execute(sql) columns = [desc[0] for desc in cursor.description] rows = cursor.fetchall() if self.dbtype == 'mysql': result = [dict(row) for row in rows] elif self.dbtype == 'postgresql' or self.dbtype == 'pg': result = [dict(zip(columns, row)) for row in rows] elif self.dbtype == 'sqlserver': result = [dict(zip(columns, row)) for row in rows] else: result = [] cursor.close() try: return json.dumps(result, ensure_ascii=False) except Exception as e: return str(result)
  • The DBService context manager class used by query_sql. It fetches instance details, creates a temporary read-only account if necessary, grants privileges, establishes DB connection, executes SQL via DBConn, and cleans up afterward.
    class DBService: """ Create a read-only account, execute the SQL statements, and automatically delete the account afterward. """ def __init__(self, region_id, instance_id, database=None, ): self.instance_id = instance_id self.database = database self.region_id = region_id self.__db_type = None self.__account_name, self.__account_password = get_rds_account() self.__host = None self.__port = None self.__client = get_rds_client(region_id) self.__db_conn = None async def __aenter__(self): await asyncio.to_thread(self._get_db_instance_info) if not self.__account_name or not self.__account_password: await asyncio.to_thread(self._create_temp_account) if self.database: await asyncio.to_thread(self._grant_privilege) else: self.account_name = self.__account_name self.account_password = self.__account_password self.__db_conn = DBConn(self) await asyncio.to_thread(self.__db_conn.connect) return self async def __aexit__(self, exc_type, exc_val, exc_tb): if self.__db_conn is not None: await asyncio.to_thread(self.__db_conn.close) if not self.__account_name or not self.__account_password: await asyncio.to_thread(self._delete_account) self.__client = None def _get_db_instance_info(self): req = rds_20140815_models.DescribeDBInstanceAttributeRequest( dbinstance_id=self.instance_id, ) self.__client.describe_dbinstance_attribute(req) resp = self.__client.describe_dbinstance_attribute(req) self.db_type = resp.body.items.dbinstance_attribute[0].engine.lower() req = rds_20140815_models.DescribeDBInstanceNetInfoRequest( dbinstance_id=self.instance_id, ) resp = self.__client.describe_dbinstance_net_info(req) # 取支持的地址: vpc_host, vpc_port, public_host, public_port, dbtype = None, None, None, None, None net_infos = resp.body.dbinstance_net_infos.dbinstance_net_info for item in net_infos: if 'Private' == item.iptype: vpc_host = item.connection_string vpc_port = int(item.port) elif 'Public' in item.iptype: public_host = item.connection_string public_port = int(item.port) if vpc_host and test_connect(vpc_host, vpc_port): self.host = vpc_host self.port = vpc_port elif public_host and test_connect(public_host, public_port): self.host = public_host self.port = public_port else: raise Exception('connection db failed.') def _create_temp_account(self): self.account_name = 'mcp_' + random_str(10) self.account_password = random_password(32) request = rds_20140815_models.CreateAccountRequest( dbinstance_id=self.instance_id, account_name=self.account_name, account_password=self.account_password, account_description="Created by mcp for execute sql." ) self.__client.create_account(request) def _grant_privilege(self): req = rds_20140815_models.GrantAccountPrivilegeRequest( dbinstance_id=self.instance_id, account_name=self.account_name, dbname=self.database, account_privilege="ReadOnly" if self.db_type.lower() in ('mysql', 'postgresql') else "DBOwner" ) self.__client.grant_account_privilege(req) def _delete_account(self): if not self.account_name: return req = rds_20140815_models.DeleteAccountRequest( dbinstance_id=self.instance_id, account_name=self.account_name ) self.__client.delete_account(req) async def execute_sql(self, sql): return await asyncio.to_thread(self.__db_conn.execute_sql, sql) @property def user(self): return self.account_name @property def password(self): return self.account_password

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/aliyun/alibabacloud-rds-openapi-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server