describe_sql_insight_statistic
Analyze SQL log statistics by querying execution times, cost times, and associated accounts within a specified timeframe for Alibaba Cloud RDS instances.
Instructions
Query SQL Log statistics, including SQL cost time, execution times, and account.
Args:
dbinstance_id (str): The ID of the RDS instance.
start_time(str): the start time of sql insight statistic. e.g. 2025-06-06 20:00:00
end_time(str): the end time of sql insight statistic. e.g. 2025-06-06 20:10:00
Returns:
the sql insight statistic information in csv format.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| dbinstance_id | Yes | ||
| end_time | Yes | ||
| start_time | Yes |
Implementation Reference
- The handler function implementing the 'describe_sql_insight_statistic' MCP tool. It queries SQL insight statistics from Alibaba Cloud DAS service using the DescribeSqlInsightStatistic API, ordered by RT rate and count rate, and formats results as CSV.async def describe_sql_insight_statistic( dbinstance_id: str, start_time: str, end_time: str, ) -> Dict[str, Any]: """ Query SQL Log statistics, including SQL cost time, execution times, and account. Args: dbinstance_id (str): The ID of the RDS instance. start_time(str): the start time of sql insight statistic. e.g. 2025-06-06 20:00:00 end_time(str): the end time of sql insight statistic. e.g. 2025-06-06 20:10:00 Returns: the sql insight statistic information in csv format. """ def _descirbe(order_by: str): try: # Initialize client client = get_das_client() page_no = 1 page_size = 50 total = page_no * page_size + 1 result = [] while total > page_no * page_size: state = "RUNNING" job_id = "" while state == "RUNNING": body = { "InstanceId": dbinstance_id, "OrderBy": order_by, "Asc": False, "PageNo": 1, "PageSize": 10, "TemplateId": "", "DbName": "", "StartTime": convert_datetime_to_timestamp(start_time), "EndTime": convert_datetime_to_timestamp(end_time), "JobId": job_id } req = open_api_models.OpenApiRequest( query=OpenApiUtilClient.query({}), body=OpenApiUtilClient.parse_to_map(body) ) params = open_api_models.Params( action='DescribeSqlInsightStatistic', version='2020-01-16', protocol='HTTPS', pathname='/', method='POST', auth_type='AK', style='RPC', req_body_type='formData', body_type='json' ) response = client.call_api(params, req, util_models.RuntimeOptions()) response_data = response['body']['Data'] state = response_data['State'] if state == "RUNNING": job_id = response_data['ResultId'] time.sleep(1) continue if state == "SUCCESS": result.extend(response_data['Data']['List']) total = response_data['Data']['Total'] page_no = page_no + 1 return json_array_to_csv(result) except Exception as e: logger.error(f"Error occurred: {str(e)}") raise e rt_rate = _descirbe("rtRate") count_rate = _descirbe("countRate") return { "sql_log_order_by_rt_rate": rt_rate, "sql_log_order_by_count_rate": count_rate }