get_query
Retrieve the status and paginated results of an asynchronous database query.
Instructions
Get asynchronous query status and paginated results when complete.
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| query_id | Yes | ||
| offset | No | ||
| limit | No |
Implementation Reference
- sql_query_mcp/async_queries.py:141-180 (handler)Core implementation of get_query: retrieves an async query job, validates offset/limit, formats results with pagination, and audits the operation.
def get_query( self, query_id: str, offset: int = 0, limit: Optional[int] = None ) -> Dict[str, object]: started = time.perf_counter() try: if offset < 0: raise QueryExecutionError("offset 必须大于等于 0。") if limit is not None and int(limit) < 0: raise QueryExecutionError("limit 必须大于等于 0。") with self._lock: job = self._get_job_locked(query_id) result = self._format_job_locked(job, offset, limit) sql_summary = job.sql_summary engine = job.engine status = job.status audit_connection_id = result.get("connection_id") self._audit.log( tool="get_query", connection_id=( audit_connection_id if isinstance(audit_connection_id, str) else None ), success=True, duration_ms=_elapsed_ms(started), sql_summary=sql_summary, extra={"engine": engine, "status": status}, ) return result except Exception as exc: sanitized = sanitize_error_message(str(exc)) self._audit.log( tool="get_query", connection_id=None, success=False, duration_ms=_elapsed_ms(started), error=sanitized, extra={"query_id": query_id}, ) if isinstance(exc, QueryExecutionError): raise raise QueryExecutionError(sanitized) from exc - Helper _format_job_locked: formats a query job result dict, applying offset/limit pagination for succeeded queries.
def _format_job_locked( self, job: _AsyncQueryJob, offset: int, limit: Optional[int] ) -> Dict[str, object]: result: Dict[str, object] = { "query_id": job.query_id, "connection_id": job.connection_id, "engine": job.engine, "status": job.status, } if job.status == FAILED: result["error"] = job.error if job.status != SUCCEEDED: return result page_limit = len(job.rows) if limit is None else int(limit) rows = job.rows[offset : offset + page_limit] result.update( { "rows": rows, "columns": job.columns, "row_count": job.row_count, "truncated": job.truncated, "duration_ms": job.duration_ms, "applied_limit": job.applied_limit, "offset": offset, "returned_row_count": len(rows), } ) return result - Helper _get_job_locked: retrieves a job by query_id from the internal dict, raising QueryExecutionError if not found.
def _get_job_locked(self, query_id: str) -> _AsyncQueryJob: try: return self._jobs[query_id] except KeyError as exc: raise QueryExecutionError(f"未知 query_id: {query_id}") from exc - sql_query_mcp/app.py:99-103 (registration)Registration of the get_query MCP tool: decorated with @mcp.tool(), delegates to async_queries.get_query(query_id, offset, limit).
@mcp.tool() def get_query(query_id: str, offset: int = 0, limit: Optional[int] = None) -> dict: """Get asynchronous query status and paginated results when complete.""" return _run_tool(lambda: async_queries.get_query(query_id, offset, limit)) - Schema/parameters of get_query: query_id (str), offset (int, default 0), limit (Optional[int]). Returns Dict[str, object] with query status and paginated rows.
def get_query( self, query_id: str, offset: int = 0, limit: Optional[int] = None ) -> Dict[str, object]: