read-query
Execute SELECT SQL queries on Apache Pinot databases via the StarTree MCP Server to retrieve and analyze data efficiently.
Instructions
Execute a SELECT query on the Pinot database
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| query | Yes | SELECT SQL query to execute |
Implementation Reference
- mcp_pinot/server.py:58-68 (handler)The core handler function for the 'read-query' MCP tool. It validates that the query is a SELECT statement, executes the query using the PinotClient instance, formats the results as indented JSON, and handles errors.@mcp.tool def read_query(query: str) -> str: """Execute a SELECT query on the Pinot database""" try: if not query.strip().upper().startswith("SELECT"): raise ValueError("Only SELECT queries are allowed for read-query") results = pinot_client.execute_query(query=query) return json.dumps(results, indent=2) except Exception as e: return f"Error: {str(e)}"
- mcp_pinot/pinot_client.py:240-261 (helper)Supporting method in PinotClient that implements the actual query execution. Prefers HTTP POST to Pinot broker endpoint, with fallback to pinotdb driver. Preprocesses query and handles timeouts.def execute_query( self, query: str, params: dict[str, Any] | None = None, ) -> list[dict[str, Any]]: logger.debug(f"Executing query: {query[:100]}...") # Log first 100 chars # Use HTTP as primary method since it works reliably with authenticated clusters try: return self.execute_query_http(query) except Exception as e: logger.warning(f"HTTP query failed: {e}, trying PinotDB fallback") try: return self.execute_query_pinotdb(query, params) except Exception as pinotdb_error: error_msg = ( f"Both HTTP and PinotDB queries failed. " f"HTTP: {e}, PinotDB: {pinotdb_error}" ) logger.error(error_msg) raise
- mcp_pinot/pinot_client.py:210-239 (helper)HTTP-based query execution helper called by execute_query. Sends POST to Pinot broker /query/sql endpoint with authentication, parses resultTable into list of dicts.def execute_query_http(self, query: str) -> list[dict[str, Any]]: """Alternative query execution using HTTP requests directly to broker""" broker_url = f"{self.config.broker_scheme}://{self.config.broker_host}:{self.config.broker_port}/{PinotEndpoints.QUERY_SQL}" logger.debug(f"Executing query via HTTP: {query[:100]}...") payload = { "sql": query, "queryOptions": f"timeoutMs={self.config.query_timeout * 1000}", } response = self.http_request(broker_url, "POST", payload) result_data = response.json() # Check for query errors in response if "exceptions" in result_data and result_data["exceptions"]: raise Exception(f"Query error: {result_data['exceptions']}") # Parse the result into pandas-like format if "resultTable" in result_data: columns = result_data["resultTable"]["dataSchema"]["columnNames"] rows = result_data["resultTable"]["rows"] # Convert to list of dictionaries result = [dict(zip(columns, row)) for row in rows] logger.debug(f"HTTP query returned {len(result)} rows") return result else: logger.warning("No resultTable in response, returning empty result") return []
- mcp_pinot/server.py:58-58 (registration)FastMCP decorator that registers the read_query function as an MCP tool, automatically generating schema from signature and exposing it as 'read-query'.@mcp.tool
- mcp_pinot/prompts.py:31-32 (helper)Prompt template that describes the 'read-query' tool for the AI assistant, aiding in tool usage.1. read-query: Execute a SQL query on Pinot and return the results 2. list-tables: List all available tables in Pinot