ydb_describe_path
Retrieve detailed metadata about YDB database paths, including tables and directories, to understand structure and properties.
Instructions
Get detailed information about a YDB path (table, directory, etc.)
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| path | Yes |
Implementation Reference
- ydb_mcp/server.py:768-955 (handler)The primary handler function for the 'ydb_describe_path' tool. It uses YDB's scheme_client.describe_path to retrieve basic path information and, if the path is a table, uses table_client to fetch detailed table schema including columns, indexes, partitioning settings, etc. Returns formatted JSON description.async def describe_path(self, path: str) -> List[TextContent]: """Describe a path in YDB. Args: path: Path to describe Returns: List of TextContent objects with path description """ # Check for authentication errors if self.auth_error: safe_error = {"error": self.auth_error} return [TextContent(type="text", text=json.dumps(safe_error, indent=2))] try: # Create driver if needed if self.driver is None: await self.create_driver() if self.driver is None: safe_error = {"error": "Failed to create driver"} return [TextContent(type="text", text=json.dumps(safe_error, indent=2))] # Access the scheme client scheme_client = self.driver.scheme_client # Describe the path logger.info(f"Describing path: {path}") path_response = await scheme_client.describe_path(path) # Process the response if path_response is None: safe_error = {"error": f"Path '{path}' not found"} return [TextContent(type="text", text=json.dumps(safe_error, indent=2))] # Format the result result = { "path": path, "type": str(path_response.type), "name": path_response.name, "owner": path_response.owner, } # Add permissions if available if hasattr(path_response, "permissions") and path_response.permissions: result["permissions"] = [] for perm in path_response.permissions: result["permissions"].append( {"subject": perm.subject, "permission_names": list(perm.permission_names)} ) # Add table specific information if it's a table if str(path_response.type) == "TABLE" or path_response.type == 2: try: # Get table client for more detailed table info table_client = self.driver.table_client session = await table_client.session().create() try: # Get detailed table description table_desc = await session.describe_table(path) result["table"] = { "columns": [], "primary_key": table_desc.primary_key, "indexes": [], "partitioning_settings": {}, "storage_settings": {}, "key_bloom_filter": table_desc.key_bloom_filter, "read_replicas_settings": table_desc.read_replicas_settings, "column_families": [], } # Add columns with more details for column in table_desc.columns: col_info = { "name": column.name, "type": str(column.type), "family": column.family, } result["table"]["columns"].append(col_info) # Add indexes with more details for index in table_desc.indexes: index_info = { "name": index.name, "index_columns": list(index.index_columns), "cover_columns": (list(index.cover_columns) if hasattr(index, "cover_columns") else []), "index_type": (str(index.index_type) if hasattr(index, "index_type") else None), } result["table"]["indexes"].append(index_info) # Add column families if present if hasattr(table_desc, "column_families"): for family in table_desc.column_families: family_info = { "name": family.name, "data": family.data, "compression": (str(family.compression) if hasattr(family, "compression") else None), } result["table"]["column_families"].append(family_info) # Add storage settings if present if hasattr(table_desc, "storage_settings"): ss = table_desc.storage_settings if ss: result["table"]["storage_settings"] = { "tablet_commit_log0": ss.tablet_commit_log0, "tablet_commit_log1": ss.tablet_commit_log1, "external": ss.external, "store_external": ss.store_external, } # Add partitioning settings if present if hasattr(table_desc, "partitioning_settings"): ps = table_desc.partitioning_settings if ps: if hasattr(ps, "partition_at_keys"): result["table"]["partitioning_settings"]["partition_at_keys"] = ps.partition_at_keys if hasattr(ps, "partition_by_size"): result["table"]["partitioning_settings"]["partition_by_size"] = ps.partition_by_size if hasattr(ps, "min_partitions_count"): result["table"]["partitioning_settings"]["min_partitions_count"] = ( ps.min_partitions_count ) if hasattr(ps, "max_partitions_count"): result["table"]["partitioning_settings"]["max_partitions_count"] = ( ps.max_partitions_count ) finally: # Always release the session await session.close() except Exception as table_error: logger.warning(f"Error getting detailed table info: {table_error}") # Fallback to basic table info from path_response if hasattr(path_response, "table") and path_response.table: result["table"] = { "columns": [], "primary_key": ( path_response.table.primary_key if hasattr(path_response.table, "primary_key") else [] ), "indexes": [], "partitioning_settings": {}, } # Add basic columns if hasattr(path_response.table, "columns"): for column in path_response.table.columns: result["table"]["columns"].append({"name": column.name, "type": str(column.type)}) # Add basic indexes if hasattr(path_response.table, "indexes"): for index in path_response.table.indexes: result["table"]["indexes"].append( { "name": index.name, "index_columns": ( list(index.index_columns) if hasattr(index, "index_columns") else [] ), } ) # Add basic partitioning settings if hasattr(path_response.table, "partitioning_settings"): ps = path_response.table.partitioning_settings if ps: if hasattr(ps, "partition_at_keys"): result["table"]["partitioning_settings"]["partition_at_keys"] = ps.partition_at_keys if hasattr(ps, "partition_by_size"): result["table"]["partitioning_settings"]["partition_by_size"] = ps.partition_by_size if hasattr(ps, "min_partitions_count"): result["table"]["partitioning_settings"]["min_partitions_count"] = ( ps.min_partitions_count ) if hasattr(ps, "max_partitions_count"): result["table"]["partitioning_settings"]["max_partitions_count"] = ( ps.max_partitions_count ) # Convert to JSON string and return as TextContent formatted_result = json.dumps(result, indent=2, cls=CustomJSONEncoder) return [TextContent(type="text", text=formatted_result)] except Exception as e: logger.exception(f"Error describing path {path}: {e}") safe_error = {"error": f"Error describing path {path}: {str(e)}"} return [TextContent(type="text", text=json.dumps(safe_error, indent=2))]
- ydb_mcp/server.py:628-658 (registration)Tool registration in register_tools method. Defines the tool specification including name, description, handler reference, and input schema (parameters). Registers the tool with both FastMCP (self.add_tool) and the internal ToolManager.{ "name": "ydb_describe_path", "description": "Get detailed information about a YDB path (table, directory, etc.)", "handler": self.describe_path, "parameters": { "properties": {"path": {"type": "string", "title": "Path"}}, "required": ["path"], "type": "object", }, }, ] # Register all tools with FastMCP framework for spec in tool_specs: self.add_tool( spec["handler"], name=spec["name"], description=spec["description"], # Structured output is temporarily disabled until proper schema definitions are implemented. # See https://github.com/ydb-platform/ydb-mcp/issues/12 for details. structured_output=False, ) # Also register with our tool manager self.tool_manager.register_tool( name=spec["name"], handler=spec["handler"], description=spec["description"], parameters=spec.get("parameters"), )
- ydb_mcp/server.py:628-637 (schema)JSON schema definition for the tool's input parameters: requires a 'path' string.{ "name": "ydb_describe_path", "description": "Get detailed information about a YDB path (table, directory, etc.)", "handler": self.describe_path, "parameters": { "properties": {"path": {"type": "string", "title": "Path"}}, "required": ["path"], "type": "object", }, },
- ydb_mcp/server.py:1068-1069 (handler)Dispatch logic in the overridden call_tool method that routes calls to the describe_path handler.elif tool_name == "ydb_describe_path" and "path" in params: result = await self.describe_path(path=params["path"])