Skip to main content
Glama
aliyun

Alibaba Cloud RDS OpenAPI MCP Server

Official
by aliyun

query_sql

Execute read-only SQL queries on Alibaba Cloud RDS databases to retrieve data or show database information using instance ID, region, and database name.

Instructions

execute read-only sql likes show xxx, select xxx
Args:
    dbinstance_id (str): The ID of the RDS instance.
    region_id(str): the region id of instance.
    db_name(str): the db name for execute sql.
    sql(str): the sql to be executed.
Returns:
    the sql result.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
region_idYes
dbinstance_idYes
db_nameYes
sqlYes

Implementation Reference

  • The core handler function for the 'query_sql' MCP tool. It creates a temporary read-only database account if needed, connects to the specified RDS instance and database, executes the provided read-only SQL query, and returns the results as JSON.
    async def query_sql(
            region_id: str,
            dbinstance_id: str,
            db_name: str,
            sql: str
    ) -> str:
        """
        execute read-only sql likes show xxx, select xxx
        Args:
            dbinstance_id (str): The ID of the RDS instance.
            region_id(str): the region id of instance.
            db_name(str): the db name for execute sql.
            sql(str): the sql to be executed.
        Returns:
            the sql result.
        """
        try:
            async with DBService(region_id, dbinstance_id, db_name) as service:
                return await service.execute_sql(sql=sql)
        except Exception as e:
            logger.error(f"Error occurred: {str(e)}")
            raise e
  • The DBConn.execute_sql method, which performs the actual SQL execution and formats results as JSON for the query_sql tool.
    def execute_sql(self, sql):
        cursor = self.conn.cursor()
        cursor.execute(sql)
        columns = [desc[0] for desc in cursor.description]
        rows = cursor.fetchall()
        if self.dbtype == 'mysql':
            result = [dict(row) for row in rows]
        elif self.dbtype == 'postgresql' or self.dbtype == 'pg':
            result = [dict(zip(columns, row)) for row in rows]
        elif self.dbtype == 'sqlserver':
            result = [dict(zip(columns, row)) for row in rows]
        else:
            result = []
        cursor.close()
        try:
            return json.dumps(result, ensure_ascii=False)
        except Exception as e:
            return str(result)
  • The DBService context manager class used by query_sql. It fetches instance details, creates a temporary read-only account if necessary, grants privileges, establishes DB connection, executes SQL via DBConn, and cleans up afterward.
    class DBService:
        """
        Create a read-only account, execute the SQL statements, and automatically delete the account afterward.
        """
        def __init__(self,
                     region_id,
                     instance_id,
                     database=None, ):
            self.instance_id = instance_id
            self.database = database
            self.region_id = region_id
    
            self.__db_type = None
            self.__account_name, self.__account_password = get_rds_account()
            self.__host = None
            self.__port = None
            self.__client = get_rds_client(region_id)
            self.__db_conn = None
    
        async def __aenter__(self):
            await asyncio.to_thread(self._get_db_instance_info)
            if not self.__account_name or not self.__account_password:
                await asyncio.to_thread(self._create_temp_account)
                if self.database:
                    await asyncio.to_thread(self._grant_privilege)
            else:
                self.account_name = self.__account_name
                self.account_password = self.__account_password
            self.__db_conn = DBConn(self)
            await asyncio.to_thread(self.__db_conn.connect)
            return self
    
        async def __aexit__(self, exc_type, exc_val, exc_tb):
            if self.__db_conn is not None:
                await asyncio.to_thread(self.__db_conn.close)
            if not self.__account_name or not self.__account_password:
                await asyncio.to_thread(self._delete_account)
            self.__client = None
    
        def _get_db_instance_info(self):
            req = rds_20140815_models.DescribeDBInstanceAttributeRequest(
                dbinstance_id=self.instance_id,
            )
            self.__client.describe_dbinstance_attribute(req)
            resp = self.__client.describe_dbinstance_attribute(req)
            self.db_type = resp.body.items.dbinstance_attribute[0].engine.lower()
    
            req = rds_20140815_models.DescribeDBInstanceNetInfoRequest(
                dbinstance_id=self.instance_id,
            )
            resp = self.__client.describe_dbinstance_net_info(req)
    
            # 取支持的地址:
            vpc_host, vpc_port, public_host, public_port, dbtype = None, None, None, None, None
            net_infos = resp.body.dbinstance_net_infos.dbinstance_net_info
            for item in net_infos:
                if 'Private' == item.iptype:
                    vpc_host = item.connection_string
                    vpc_port = int(item.port)
                elif 'Public' in item.iptype:
                    public_host = item.connection_string
                    public_port = int(item.port)
    
            if vpc_host and test_connect(vpc_host, vpc_port):
                self.host = vpc_host
                self.port = vpc_port
            elif public_host and test_connect(public_host, public_port):
                self.host = public_host
                self.port = public_port
            else:
                raise Exception('connection db failed.')
    
        def _create_temp_account(self):
            self.account_name = 'mcp_' + random_str(10)
            self.account_password = random_password(32)
            request = rds_20140815_models.CreateAccountRequest(
                dbinstance_id=self.instance_id,
                account_name=self.account_name,
                account_password=self.account_password,
                account_description="Created by mcp for execute sql."
            )
            self.__client.create_account(request)
    
        def _grant_privilege(self):
            req = rds_20140815_models.GrantAccountPrivilegeRequest(
                dbinstance_id=self.instance_id,
                account_name=self.account_name,
                dbname=self.database,
                account_privilege="ReadOnly" if self.db_type.lower() in ('mysql', 'postgresql') else "DBOwner"
            )
            self.__client.grant_account_privilege(req)
    
        def _delete_account(self):
            if not self.account_name:
                return
            req = rds_20140815_models.DeleteAccountRequest(
                dbinstance_id=self.instance_id,
                account_name=self.account_name
            )
            self.__client.delete_account(req)
    
        async def execute_sql(self, sql):
            return await asyncio.to_thread(self.__db_conn.execute_sql, sql)
    
        @property
        def user(self):
            return self.account_name
    
        @property
        def password(self):
            return self.account_password

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/aliyun/alibabacloud-rds-openapi-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server