query_sql
Execute read-only SQL queries on Alibaba Cloud RDS instances to retrieve data or view database information, specifying the instance ID, region, database name, and SQL statement for precise results.
Instructions
execute read-only sql likes show xxx, select xxx
Args:
dbinstance_id (str): The ID of the RDS instance.
region_id(str): the region id of instance.
db_name(str): the db name for execute sql.
sql(str): the sql to be executed.
Returns:
the sql result.
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| db_name | Yes | ||
| dbinstance_id | Yes | ||
| region_id | Yes | ||
| sql | Yes |
Input Schema (JSON Schema)
{
"properties": {
"db_name": {
"title": "Db Name",
"type": "string"
},
"dbinstance_id": {
"title": "Dbinstance Id",
"type": "string"
},
"region_id": {
"title": "Region Id",
"type": "string"
},
"sql": {
"title": "Sql",
"type": "string"
}
},
"required": [
"region_id",
"dbinstance_id",
"db_name",
"sql"
],
"title": "query_sqlArguments",
"type": "object"
}
Implementation Reference
- The core handler function for the 'query_sql' MCP tool. It uses a context-managed DBService to establish a temporary read-only database connection to the specified RDS instance and executes the provided SQL query, returning the results as JSON.@mcp.tool(annotations=READ_ONLY_TOOL) async def query_sql( region_id: str, dbinstance_id: str, db_name: str, sql: str ) -> str: """ execute read-only sql likes show xxx, select xxx Args: dbinstance_id (str): The ID of the RDS instance. region_id(str): the region id of instance. db_name(str): the db name for execute sql. sql(str): the sql to be executed. Returns: the sql result. """ try: async with DBService(region_id, dbinstance_id, db_name) as service: return await service.execute_sql(sql=sql) except Exception as e: logger.error(f"Error occurred: {str(e)}") raise e
- src/alibabacloud_rds_openapi_mcp_server/server.py:1627-1627 (registration)The decorator that registers the query_sql function as an MCP tool with read-only hint.@mcp.tool(annotations=READ_ONLY_TOOL)
- DBService.__aenter__ method: Establishes connection by fetching instance info, creating temp read-only account if needed, granting privileges, and connecting via DBConn.async def __aenter__(self): await asyncio.to_thread(self._get_db_instance_info) if not self.__account_name or not self.__account_password: await asyncio.to_thread(self._create_temp_account) if self.database: await asyncio.to_thread(self._grant_privilege) else: self.account_name = self.__account_name self.account_password = self.__account_password self.__db_conn = DBConn(self) await asyncio.to_thread(self.__db_conn.connect) return self async def __aexit__(self, exc_type, exc_val, exc_tb): if self.__db_conn is not None: await asyncio.to_thread(self.__db_conn.close) if not self.__account_name or not self.__account_password: await asyncio.to_thread(self._delete_account) self.__client = None
- DBService.execute_sql: Delegates SQL execution to the underlying DBConn instance.return await asyncio.to_thread(self.__db_conn.execute_sql, sql)
- DBConn.execute_sql: Performs the actual SQL execution, fetches results as dicts, and serializes to JSON string.def execute_sql(self, sql): cursor = self.conn.cursor() cursor.execute(sql) columns = [desc[0] for desc in cursor.description] rows = cursor.fetchall() if self.dbtype == 'mysql': result = [dict(row) for row in rows] elif self.dbtype == 'postgresql' or self.dbtype == 'pg': result = [dict(zip(columns, row)) for row in rows] elif self.dbtype == 'sqlserver': result = [dict(zip(columns, row)) for row in rows] else: result = [] cursor.close() try: return json.dumps(result, ensure_ascii=False) except Exception as e: return str(result)