Skip to main content
Glama
artifacts.py12.1 kB
"""Artifacts for the Database Tuning Advisor.""" import difflib import json from typing import Any from attrs import define from attrs import field # If the recommendation cost is 0.0, we can't calculate the improvement multiple. # Return 1000000.0 to indicate infinite improvement. INFINITE_IMPROVEMENT_MULTIPLIER = 1000000.0 class ErrorResult: """Simple error result class.""" def to_text(self) -> str: return self.value def __init__(self, message: str): self.value = message def calculate_improvement_multiple(base_cost: float, rec_cost: float) -> float: """Calculate the improvement multiple from this recommendation.""" if base_cost <= 0.0: # base_cost or rec_cost might be zero, but as they are floats, the might be # represented as -0.0. That's why we compare to <= 0.0. return 1.0 if rec_cost <= 0.0: # If the recommendation cost is 0.0, we can't calculate the improvement multiple. # Return INFINITE_IMPROVEMENT_MULTIPLIER to indicate infinite improvement. return INFINITE_IMPROVEMENT_MULTIPLIER return base_cost / rec_cost @define class PlanNode: node_type: str total_cost: float startup_cost: float plan_rows: int plan_width: int # Actual metrics from ANALYZE actual_total_time: float | None = field(default=None) actual_startup_time: float | None = field(default=None) actual_rows: int | None = field(default=None) actual_loops: int | None = field(default=None) # Buffer info shared_hit_blocks: int | None = field(default=None) shared_read_blocks: int | None = field(default=None) shared_written_blocks: int | None = field(default=None) # Other common fields relation_name: str | None = field(default=None) filter: str | None = field(default=None) children: list["PlanNode"] = field(factory=list) @classmethod def from_json_data(cls, json_node: dict[str, Any]) -> "PlanNode": # Extract basic fields node = cls( node_type=json_node["Node Type"], total_cost=json_node["Total Cost"], startup_cost=json_node["Startup Cost"], plan_rows=json_node["Plan Rows"], plan_width=json_node["Plan Width"], ) # Optional ANALYZE fields if "Actual Total Time" in json_node: node.actual_total_time = json_node["Actual Total Time"] node.actual_startup_time = json_node["Actual Startup Time"] node.actual_rows = json_node["Actual Rows"] node.actual_loops = json_node["Actual Loops"] # Optional BUFFERS fields if "Shared Hit Blocks" in json_node: node.shared_hit_blocks = json_node["Shared Hit Blocks"] node.shared_read_blocks = json_node["Shared Read Blocks"] node.shared_written_blocks = json_node["Shared Written Blocks"] # Common optional fields if "Relation Name" in json_node: node.relation_name = json_node["Relation Name"] if "Filter" in json_node: node.filter = json_node["Filter"] # Recursively process child plans if "Plans" in json_node: node.children = [cls.from_json_data(child) for child in json_node["Plans"]] return node @define class ExplainPlanArtifact: value: str plan_tree: PlanNode planning_time: float | None = field(default=None) execution_time: float | None = field(default=None) def __init__( self, value: str, plan_tree: PlanNode, planning_time: float | None = None, execution_time: float | None = None, ): self.value = value self.plan_tree = plan_tree self.planning_time = planning_time self.execution_time = execution_time def to_text(self) -> str: """Convert the explain plan to a text representation. Returns: str: A string representation of the execution plan with timing information. """ result = [] # Add timing information if available if self.planning_time is not None: result.append(f"Planning Time: {self.planning_time:.3f} ms") if self.execution_time is not None: result.append(f"Execution Time: {self.execution_time:.3f} ms") # Add plan tree representation result.append(self._format_plan_node(self.plan_tree)) return "\n".join(result) @staticmethod def _format_plan_node(node: PlanNode, level: int = 0) -> str: """Recursively format a plan node and its children. Args: node: The plan node to format level: The current indentation level Returns: str: A formatted string representation of the node and its children """ indent = " " * level output = f"{indent}→ {node.node_type} (Cost: {node.startup_cost:.2f}..{node.total_cost:.2f})" # Add table name if present if node.relation_name: output += f" on {node.relation_name}" # Add rows information output += f" [Rows: {node.plan_rows}]" # Add actual metrics if available in a compact form if node.actual_total_time is not None: output += ( f" [Actual: {node.actual_startup_time:.2f}..{node.actual_total_time:.2f} ms, Rows: {node.actual_rows}, Loops: {node.actual_loops}]" ) # Add filter if present if node.filter: filter_text = node.filter # Truncate long filters for readability if len(filter_text) > 100: filter_text = filter_text[:97] + "..." output += f"\n{indent} Filter: {filter_text}" # Add buffer information if available in a compact form if node.shared_hit_blocks is not None: output += f"\n{indent} Buffers - hit: {node.shared_hit_blocks}, read: {node.shared_read_blocks}, written: {node.shared_written_blocks}" # Recursively format children if node.children: for child in node.children: output += "\n" + ExplainPlanArtifact._format_plan_node(child, level + 1) return output @classmethod def from_json_data(cls, plan_data: dict[str, Any]) -> "ExplainPlanArtifact": if "Plan" not in plan_data: raise ValueError("Missing 'Plan' field in explain plan data") # Create plan tree from the "Plan" field plan_tree = PlanNode.from_json_data(plan_data["Plan"]) # Extract optional timing information planning_time = plan_data.get("Planning Time") execution_time = plan_data.get("Execution Time") return cls( value=json.dumps(plan_data, indent=2), plan_tree=plan_tree, planning_time=planning_time, execution_time=execution_time, ) @staticmethod def format_plan_summary(plan_data): """Extract and format key information from a raw plan data.""" if not plan_data: return "No plan data available" try: # Create a PlanNode from the raw JSON data if "Plan" in plan_data: plan_node = PlanNode.from_json_data(plan_data["Plan"]) # Use _format_plan_node to format the output plan_tree = ExplainPlanArtifact._format_plan_node(plan_node, 0) return f"{plan_tree}" else: return "Invalid plan data (missing Plan field)" except Exception as e: return f"Error summarizing plan: {e}" @staticmethod def create_plan_diff(before_plan: dict[str, Any], after_plan: dict[str, Any]) -> str: """Generate a textual diff between two explain plans. Args: before_plan: The explain plan before changes after_plan: The explain plan after changes Returns: A string containing a readable diff between the two plans """ if not before_plan or not after_plan: return "Cannot generate diff: Missing plan data" try: # Create PlanNode objects from the plans before_tree = PlanNode.from_json_data(before_plan["Plan"]) if "Plan" in before_plan else None after_tree = PlanNode.from_json_data(after_plan["Plan"]) if "Plan" in after_plan else None if not before_tree or not after_tree: return "Cannot generate diff: Invalid plan structure" # Format the plans as text before_lines = ExplainPlanArtifact._format_plan_node(before_tree).split("\n") after_lines = ExplainPlanArtifact._format_plan_node(after_tree).split("\n") # Generate a readable diff with context diff_lines = [] diff_lines.append("PLAN CHANGES:") diff_lines.append("------------") # Extract cost information for a summary before_cost = before_tree.total_cost after_cost = after_tree.total_cost improvement = calculate_improvement_multiple(before_cost, after_cost) diff_lines.append(f"Cost: {before_cost:.2f} → {after_cost:.2f} ({improvement:.1f}x improvement)") diff_lines.append("") # Node type changes - a simplified structural diff diff_lines.append("Operation Changes:") # Helper function to extract node types with indentation def extract_node_types(node, level=0, result=None): if result is None: result = [] indent = " " * level node_info = f"{indent}→ {node.node_type}" if node.relation_name: node_info += f" on {node.relation_name}" result.append(node_info) for child in node.children: extract_node_types(child, level + 1, result) return result before_structure = extract_node_types(before_tree) after_structure = extract_node_types(after_tree) # Generate the structural diff structure_diff = list( difflib.unified_diff( before_structure, after_structure, n=1, # Context lines lineterm="", ) ) # Add structural diff to output if structure_diff: diff_lines.extend(structure_diff) else: diff_lines.append("No structural changes detected") # Add more specific details about key changes diff_lines.append("") diff_lines.append("Major Changes:") # Look for significant changes like seq scan to index scan, changed filters, etc. # This requires traversing both trees and comparing nodes # For simplicity, we'll just list key changes in cost and rows if before_tree.node_type != after_tree.node_type: diff_lines.append(f"- Root operation changed: {before_tree.node_type} → {after_tree.node_type}") # Compare scan methods used before_scans = [line for line in before_lines if "Seq Scan" in line] after_scans = [line for line in after_lines if "Seq Scan" in line] if len(before_scans) > len(after_scans): diff_lines.append(f"- {len(before_scans) - len(after_scans)} sequential scans replaced with more efficient access methods") # Look for new index scans before_idx_scans = [line for line in before_lines if "Index Scan" in line] after_idx_scans = [line for line in after_lines if "Index Scan" in line] if len(after_idx_scans) > len(before_idx_scans): diff_lines.append(f"- {len(after_idx_scans) - len(before_idx_scans)} new index scans used") return "\n".join(diff_lines) except Exception as e: return f"Error generating plan diff: {e}"

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/crystaldba/postgres-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server