execute_query
Run Kusto Query Language (KQL) queries on Azure Data Explorer databases to retrieve structured data in Python-friendly dictionary format.
Instructions
Executes a Kusto Query Language (KQL) query against the configured Azure Data Explorer database and returns the results as a list of dictionaries.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| query | Yes |
Implementation Reference
- src/adx_mcp_server/server.py:173-195 (handler)The handler function for the 'execute_query' tool, decorated with @mcp.tool for automatic registration in FastMCP. Executes KQL queries using KustoClient on the configured Azure Data Explorer cluster and database, formats results, and handles errors.@mcp.tool(description="Executes a Kusto Query Language (KQL) query against the configured Azure Data Explorer database and returns the results as a list of dictionaries.") async def execute_query(query: str) -> List[Dict[str, Any]]: """Execute a KQL query against the configured ADX database.""" logger.info("Executing KQL query", database=config.database, query_preview=query[:100]) if not config.cluster_url or not config.database: logger.error("Missing ADX configuration") raise ValueError("Azure Data Explorer configuration is missing. Please set ADX_CLUSTER_URL and ADX_DATABASE environment variables.") try: client = get_kusto_client() result_set = client.execute(config.database, query) results = format_query_results(result_set) logger.info("Query executed successfully", row_count=len(results)) return results except Exception as e: logger.error( "Query execution failed", error=str(e), exception_type=type(e).__name__, database=config.database ) raise
- src/adx_mcp_server/server.py:138-171 (helper)Supporting helper function that formats the raw result set from Kusto query execution into a standardized list of dictionaries, mapping column names to values.def format_query_results(result_set) -> List[Dict[str, Any]]: """ Format Kusto query results into a list of dictionaries. Args: result_set: Raw result set from KustoClient Returns: List of dictionaries with column names as keys """ if not result_set or not result_set.primary_results: logger.debug("Empty or null result set received") return [] try: primary_result = result_set.primary_results[0] columns = [col.column_name for col in primary_result.columns] formatted_results = [] for row in primary_result.rows: record = {} for i, value in enumerate(row): record[columns[i]] = value formatted_results.append(record) logger.debug("Query results formatted", row_count=len(formatted_results), columns=columns) return formatted_results except Exception as e: logger.error( "Error formatting query results", error=str(e), exception_type=type(e).__name__ ) raise
- src/adx_mcp_server/server.py:83-136 (helper)Supporting utility to initialize the KustoClient with Azure credentials, supporting WorkloadIdentityCredential for AKS or falling back to DefaultAzureCredential.def get_kusto_client() -> KustoClient: """ Create and configure a Kusto client with appropriate Azure credentials. Prioritizes WorkloadIdentityCredential when running in AKS with workload identity, falls back to DefaultAzureCredential for other authentication methods. Returns: KustoClient: Configured Kusto client instance """ tenant_id = os.environ.get('AZURE_TENANT_ID') client_id = os.environ.get('AZURE_CLIENT_ID') token_file_path = os.environ.get('ADX_TOKEN_FILE_PATH', '/var/run/secrets/azure/tokens/azure-identity-token') if tenant_id and client_id: logger.info( "Using WorkloadIdentityCredential", client_id=client_id, tenant_id=tenant_id, token_file_path=token_file_path ) try: credential = WorkloadIdentityCredential( tenant_id=tenant_id, client_id=client_id, token_file_path=token_file_path ) except Exception as e: logger.warning( "Failed to initialize WorkloadIdentityCredential, falling back", error=str(e), exception_type=type(e).__name__ ) credential = DefaultAzureCredential() else: logger.info("Using DefaultAzureCredential (missing WorkloadIdentity credentials)") credential = DefaultAzureCredential() try: kcsb = KustoConnectionStringBuilder.with_azure_token_credential( connection_string=config.cluster_url, credential=credential ) client = KustoClient(kcsb) logger.debug("Kusto client initialized successfully", cluster_url=config.cluster_url) return client except Exception as e: logger.error( "Failed to create Kusto client", error=str(e), exception_type=type(e).__name__, cluster_url=config.cluster_url ) raise