collect_query_dump_and_profile
Execute a query and retrieve its dump and profile. The large output requires specialized tools for further processing.
Instructions
Run a query to get it's query dump and profile, output very large, need special tools to do further processing
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| query | Yes | query to execute | |
| db | No | database |
Implementation Reference
- The tool handler function 'collect_query_dump_and_profile' decorated with @mcp.tool. It calls db_client.collect_perf_analysis_input() and returns a ToolResult with status and structured content.
@mcp.tool(description="Run a query to get it's query dump and profile, output very large, need special tools to do further processing") def collect_query_dump_and_profile( query: Annotated[str, Field(description="query to execute")], db: Annotated[str|None, Field(description="database")] = None ) -> ToolResult: logger.info(f"Collecting query dump and profile for query: {query[:100]}{'...' if len(query) > 100 else ''}") result : PerfAnalysisInput = db_client.collect_perf_analysis_input(query, db=db) if result.get('error_message'): status = f"collecting query dump and profile failed, query_id={result.get('query_id')} error_message={result.get('error_message')}" logger.warning(status) else: status = f"collecting query dump and profile succeeded, but it's only for user/tool, not for AI, query_id={result.get('query_id')}" logger.info(status) return ToolResult( content=[TextContent(type='text', text=status)], structured_content=result, ) - src/mcp_server_starrocks/server.py:237-237 (registration)The @mcp.tool decorator registers 'collect_query_dump_and_profile' as an MCP tool with the description 'Run a query to get its query dump and profile, output very large, need special tools to do further processing'.
@mcp.tool(description="Run a query to get it's query dump and profile, output very large, need special tools to do further processing") - The 'collect_perf_analysis_input' method on DBClient that executes the query, retrieves query dump, profile, and analyze profile, returning a PerfAnalysisInput dict.
def collect_perf_analysis_input(self, query: str, db:Optional[str]=None) -> PerfAnalysisInput: conn = None try: conn = self._get_connection() # Switch database if specified if db and db != self.default_database: cursor_temp = conn.cursor() try: cursor_temp.execute(f"USE `{db}`") except (MySQLError, adbcError) as db_err: return {"error_message":str(db_err)} finally: cursor_temp.close() query_dump_result = self._execute(conn, "select get_query_dump(%s, %s)", (query, False)) if not query_dump_result.success: return {"error_message":query_dump_result.error_message} ret = { "query_dump": json.loads(query_dump_result.rows[0][0]), } start_ts = time.time() profile_query = "/*+ SET_VAR (enable_profile='true') */ " + query query_result = self._execute(conn, profile_query) duration = time.time() - start_ts ret["duration"] = duration if not query_result.success: ret["error_message"] = query_result.error_message return ret ret["rows_returned"] = len(query_result.rows) if query_result.rows else 0 # Try to get query id query_id_result = self._execute(conn, "select last_query_id()") if not query_id_result.success: ret["error_message"] = query_id_result.error_message return ret ret["query_id"] = query_id_result.rows[0][0] # Try to get query profile with retries query_profile = '' retry_count = 0 while not query_profile and retry_count < 3: time.sleep(1+retry_count) query_profile_result = self._execute(conn,"select get_query_profile(%s)", (ret["query_id"],)) if query_profile_result.success: query_profile = query_profile_result.rows[0][0] retry_count += 1 if not query_profile: ret['error_message'] = "Failed to get query profile after 3 retries" return ret ret['profile'] = query_profile analyze_profile_result = self._execute(conn,"ANALYZE PROFILE FROM %s", (ret["query_id"],)) if not analyze_profile_result.success: ret["error_message"] = analyze_profile_result.error_message return ret analyze_text = '\n'.join(row[0] for row in analyze_profile_result.rows) ret['analyze_profile'] = remove_ansi_codes(analyze_text) return ret except (MySQLError, adbcError) as e: self._handle_db_error(e) return {"error_message":str(e)} except Exception as e: return {"error_message":str(e)} finally: if conn and not self.enable_arrow_flight_sql: try: conn.close() except: pass - The 'PerfAnalysisInput' TypedDict defining the schema for the structured result returned by collect_perf_analysis_input.
class PerfAnalysisInput(TypedDict): error_message: NotRequired[Optional[str]] query_id: NotRequired[Optional[str]] rows_returned: NotRequired[Optional[int]] duration: NotRequired[Optional[float]] query_dump: NotRequired[Optional[dict]] profile: NotRequired[Optional[str]] analyze_profile: NotRequired[Optional[str]] - The 'remove_ansi_codes' helper used to strip ANSI escape codes from the ANALYZE PROFILE output text.
def remove_ansi_codes(text): return ANSI_ESCAPE_PATTERN.sub('', text)