list_tables
Retrieve a list of all tables in your Azure Data Explorer database, including table names, folders, and database associations.
Instructions
Retrieves a list of all tables available in the configured Azure Data Explorer database, including their names, folders, and database associations.
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
No arguments | |||
Output Schema
| Name | Required | Description | Default |
|---|---|---|---|
| result | Yes |
Implementation Reference
- src/adx_mcp_server/server.py:223-241 (handler)The 'list_tables' tool handler function. It is decorated with @mcp.tool, connects to ADX, runs '.show tables | project TableName, Folder, DatabaseName', formats and returns results.
@mcp.tool(description="Retrieves a list of all tables available in the configured Azure Data Explorer database, including their names, folders, and database associations.") async def list_tables() -> List[Dict[str, Any]]: """List all tables in the configured ADX database.""" logger.info("Listing tables", database=config.database) if not config.cluster_url or not config.database: logger.error("Missing ADX configuration") raise ValueError("Azure Data Explorer configuration is missing. Please set ADX_CLUSTER_URL and ADX_DATABASE environment variables.") try: client = get_kusto_client() query = ".show tables | project TableName, Folder, DatabaseName" result_set = client.execute(config.database, query) results = format_query_results(result_set) logger.info("Tables listed successfully", table_count=len(results)) return results except Exception as e: logger.error("Failed to list tables", error=str(e), exception_type=type(e).__name__) raise - src/adx_mcp_server/server.py:223-223 (registration)The tool registration using the @mcp.tool decorator from FastMCP, which registers 'list_tables' as an MCP tool with FastMCP.
@mcp.tool(description="Retrieves a list of all tables available in the configured Azure Data Explorer database, including their names, folders, and database associations.") - src/adx_mcp_server/server.py:84-137 (helper)The get_kusto_client() helper used by list_tables to create a KustoClient with Azure credentials.
def get_kusto_client() -> KustoClient: """ Create and configure a Kusto client with appropriate Azure credentials. Prioritizes WorkloadIdentityCredential when running in AKS with workload identity, falls back to DefaultAzureCredential for other authentication methods. Returns: KustoClient: Configured Kusto client instance """ tenant_id = os.environ.get('AZURE_TENANT_ID') client_id = os.environ.get('AZURE_CLIENT_ID') token_file_path = os.environ.get('ADX_TOKEN_FILE_PATH', '/var/run/secrets/azure/tokens/azure-identity-token') if tenant_id and client_id: logger.info( "Using WorkloadIdentityCredential", client_id=client_id, tenant_id=tenant_id, token_file_path=token_file_path ) try: credential = WorkloadIdentityCredential( tenant_id=tenant_id, client_id=client_id, token_file_path=token_file_path ) except Exception as e: logger.warning( "Failed to initialize WorkloadIdentityCredential, falling back", error=str(e), exception_type=type(e).__name__ ) credential = DefaultAzureCredential() else: logger.info("Using DefaultAzureCredential (missing WorkloadIdentity credentials)") credential = DefaultAzureCredential() try: kcsb = KustoConnectionStringBuilder.with_azure_token_credential( connection_string=config.cluster_url, credential=credential ) client = KustoClient(kcsb) logger.debug("Kusto client initialized successfully", cluster_url=config.cluster_url) return client except Exception as e: logger.error( "Failed to create Kusto client", error=str(e), exception_type=type(e).__name__, cluster_url=config.cluster_url ) raise - src/adx_mcp_server/server.py:139-172 (helper)The format_query_results() helper used by list_tables to format Kusto query results into dictionaries.
def format_query_results(result_set) -> List[Dict[str, Any]]: """ Format Kusto query results into a list of dictionaries. Args: result_set: Raw result set from KustoClient Returns: List of dictionaries with column names as keys """ if not result_set or not result_set.primary_results: logger.debug("Empty or null result set received") return [] try: primary_result = result_set.primary_results[0] columns = [col.column_name for col in primary_result.columns] formatted_results = [] for row in primary_result.rows: record = {} for i, value in enumerate(row): record[columns[i]] = value formatted_results.append(record) logger.debug("Query results formatted", row_count=len(formatted_results), columns=columns) return formatted_results except Exception as e: logger.error( "Error formatting query results", error=str(e), exception_type=type(e).__name__ ) raise