run_steampipe_query
Execute SQL queries via Steampipe CLI and retrieve results as JSON. Designed for integration with MCP, enabling data retrieval and analysis from Steampipe.
Instructions
Executes a SQL query using the Steampipe CLI and returns the results as a JSON string.
Args: query: The SQL query to execute via Steampipe (e.g., "select login from github_user limit 1"). Ensure the query is valid Steampipe SQL.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| query | Yes |
Implementation Reference
- steampipe_mcp_server.py:16-103 (handler)The main handler function for the 'run_steampipe_query' tool. It executes Steampipe SQL queries using subprocess, captures JSON output, parses it, and returns formatted JSON or error messages. Registered via @mcp.tool() decorator.@mcp.tool() def run_steampipe_query(query: str) -> str: """ Executes a SQL query using the Steampipe CLI and returns the results as a JSON string. Args: query: The SQL query to execute via Steampipe (e.g., "select login from github_user limit 1"). Ensure the query is valid Steampipe SQL. """ logger.info(f"Received request to run Steampipe query: {query}") command = ["steampipe", "query", query, "--output", "json"] try: # Execute the command # Set timeout to prevent hanging indefinitely (e.g., 60 seconds) result = subprocess.run( command, capture_output=True, text=True, check=False, # Don't raise exception on non-zero exit code, handle manually timeout=60 ) # Log stdout/stderr regardless of success for debugging if result.stdout: logger.info(f"Steampipe stdout:\n{result.stdout[:500]}...") # Log truncated stdout if result.stderr: logger.warning(f"Steampipe stderr:\n{result.stderr}") # Check if the command executed successfully if result.returncode != 0: error_message = f"Steampipe query failed with exit code {result.returncode}." if result.stderr: error_message += f"\nError details: {result.stderr}" logger.error(error_message) # Return the error message to Claude so it knows what went wrong return f"Error: {error_message}" # Attempt to parse the JSON output try: # Steampipe might return multiple JSON objects (one per row) or a single JSON array # Handle potential multiple JSON objects streamed line by line lines = result.stdout.strip().splitlines() if not lines: logger.info("Steampipe returned no output.") return "[]" # Return empty JSON array if no output # Try parsing as a single JSON array first (common case) try: parsed_json = json.loads(result.stdout) output_string = json.dumps(parsed_json, indent=2) logger.info("Successfully parsed Steampipe output as single JSON object/array.") return output_string except json.JSONDecodeError: # If single parse fails, try parsing line by line logger.warning("Failed to parse stdout as single JSON, attempting line-by-line parsing.") parsed_objects = [] for line in lines: try: parsed_objects.append(json.loads(line)) except json.JSONDecodeError as json_err_line: logger.error(f"Failed to parse line: {line}. Error: {json_err_line}") # Decide how to handle line parse errors, maybe return partial results or error if not parsed_objects: logger.error("Failed to parse any lines from Steampipe output.") return "Error: Failed to parse Steampipe JSON output." output_string = json.dumps(parsed_objects, indent=2) logger.info("Successfully parsed Steampipe output line-by-line.") return output_string except json.JSONDecodeError as json_err: error_message = f"Failed to parse Steampipe output as JSON. Error: {json_err}. Raw output: {result.stdout}" logger.error(error_message) return f"Error: {error_message}" # Return error and raw output for debugging except subprocess.TimeoutExpired: error_message = "Steampipe query timed out after 60 seconds." logger.error(error_message) return f"Error: {error_message}" except FileNotFoundError: error_message = "Steampipe command not found. Make sure it's installed and in your system PATH." logger.error(error_message) return f"Error: {error_message}" except Exception as e: error_message = f"An unexpected error occurred while running Steampipe: {e}" logger.exception(error_message) # Log full traceback return f"Error: {error_message}"