execute_sql
Execute SQL queries in GreptimeDB using MySQL dialect to retrieve, analyze, and manage data efficiently through a secure Model Context Protocol server interface.
Instructions
Execute SQL query against GreptimeDB. Please use MySQL dialect when generating SQL queries.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| query | Yes | The SQL query to execute (using MySQL dialect) |
Implementation Reference
- The main handler function for tool calls, which executes the SQL query for the 'execute_sql' tool after validation and security checks.async def call_tool(self, name: str, arguments: dict) -> list[TextContent]: """Execute SQL commands.""" logger = self.logger config = self.db_config logger.info(f"Calling tool: {name} with arguments: {arguments}") if name != "execute_sql": raise ValueError(f"Unknown tool: {name}") query = arguments.get("query") if not query: raise ValueError("Query is required") # Check if query is dangerous is_dangerous, reason = security_gate(query=query) if is_dangerous: return [ TextContent( type="text", text="Error: Contain dangerous operations, reason:" + reason, ) ] try: with connect(**config) as conn: with conn.cursor() as cursor: cursor.execute(query) stmt = query.strip().upper() # Special handling for SHOW DATABASES if stmt.startswith("SHOW DATABASES"): dbs = cursor.fetchall() result = ["Databases"] # Header result.extend([db[0] for db in dbs]) return [TextContent(type="text", text="\n".join(result))] # Special handling for SHOW TABLES if stmt.startswith("SHOW TABLES"): tables = cursor.fetchall() result = ["Tables_in_" + config["database"]] # Header result.extend([table[0] for table in tables]) return [TextContent(type="text", text="\n".join(result))] # Regular queries elif any( stmt.startswith(cmd) for cmd in ["SELECT", "SHOW", "DESC", "TQL", "EXPLAIN"] ): columns = [desc[0] for desc in cursor.description] rows = cursor.fetchall() result = [",".join(map(str, row)) for row in rows] return [ TextContent( type="text", text="\n".join([",".join(columns)] + result), ) ] # Non-SELECT queries else: conn.commit() return [ TextContent( type="text", text=f"Query executed successfully. Rows affected: {cursor.rowcount}", ) ] except Error as e: logger.error(f"Error executing SQL '{query}': {e}") return [TextContent(type="text", text=f"Error executing query: {str(e)}")]
- src/greptimedb_mcp_server/server.py:171-192 (registration)Registers the 'execute_sql' tool, including its name, description, and input schema.async def list_tools(self) -> list[Tool]: """List available GreptimeDB tools.""" logger = self.logger logger.info("Listing tools...") return [ Tool( name="execute_sql", description="Execute SQL query against GreptimeDB. Please use MySQL dialect when generating SQL queries.", inputSchema={ "type": "object", "properties": { "query": { "type": "string", "description": "The SQL query to execute (using MySQL dialect)", } }, "required": ["query"], }, ) ]
- The input schema definition for the 'execute_sql' tool, specifying the required 'query' parameter.inputSchema={ "type": "object", "properties": { "query": { "type": "string", "description": "The SQL query to execute (using MySQL dialect)", } }, "required": ["query"], }, ) ]
- Helper function used in the handler to detect and block dangerous SQL operations like DROP, DELETE, etc.def security_gate(query: str) -> tuple[bool, str]: """ Simple security check for SQL queries. Args: query: The SQL query to check Returns: tuple: A boolean indicating if the query is dangerous, and a reason message """ if not query or not query.strip(): return True, "Empty query not allowed" # Remove comments and normalize whitespace clean_query = re.sub(r"/\*.*?\*/", " ", query, flags=re.DOTALL) # Remove /* */ clean_query = re.sub(r"--.*", "", clean_query) # Remove -- clean_query = re.sub(r"\s+", " ", clean_query).strip().upper() # Normalize spaces # Check for dangerous patterns dangerous_patterns = [ (r"\bDROP\b", "Forbided `DROP` operation"), (r"\bDELETE\b", "Forbided `DELETE` operation"), (r"\bREVOKE\b", "Forbided `REVOKE` operation"), (r"\bTRUNCATE\b", "Forbided `TRUNCATE` operation"), (r"\bUPDATE\b", "Forbided `UPDATE` operation"), (r"\bINSERT\b", "Forbided `INSERT` operation"), (r"\bALTER\b", "Forbided `ALTER` operation"), (r"\bCREATE\b", "Forbided `CREATE` operation"), (r"\bGRANT\b", "Forbided `GRANT` operation"), (r";\s*\w+", "Forbided multiple statements"), ] for pattern, reason in dangerous_patterns: if re.search(pattern, clean_query): logger.warning(f"Dangerous pattern detected: {query[:50]}...") return True, reason return False, ""