Skip to main content
Glama

Azure MCP Server

Official
MIT License
1,161
  • Linux
  • Apple
KustoService.cs12.8 kB
// Copyright (c) Microsoft Corporation. // Licensed under the MIT License. using Azure.ResourceManager.Kusto; using AzureMcp.Core.Options; using AzureMcp.Core.Services.Azure; using AzureMcp.Core.Services.Azure.Subscription; using AzureMcp.Core.Services.Azure.Tenant; using AzureMcp.Core.Services.Caching; using AzureMcp.Core.Services.Http; using AzureMcp.Kusto.Commands; namespace AzureMcp.Kusto.Services; public sealed class KustoService( ISubscriptionService subscriptionService, ITenantService tenantService, ICacheService cacheService, IHttpClientService httpClientService) : BaseAzureService(tenantService), IKustoService { private readonly ISubscriptionService _subscriptionService = subscriptionService ?? throw new ArgumentNullException(nameof(subscriptionService)); private readonly ICacheService _cacheService = cacheService ?? throw new ArgumentNullException(nameof(cacheService)); private readonly IHttpClientService _httpClientService = httpClientService ?? throw new ArgumentNullException(nameof(httpClientService)); private const string CacheGroup = "kusto"; private const string KustoClustersCacheKey = "clusters"; private static readonly TimeSpan s_cacheDuration = TimeSpan.FromHours(1); private static readonly TimeSpan s_providerCacheDuration = TimeSpan.FromHours(2); // Provider cache key generator private static string GetProviderCacheKey(string clusterUri) => $"{clusterUri}"; public async Task<List<string>> ListClusters( string subscriptionId, string? tenant = null, RetryPolicyOptions? retryPolicy = null) { ValidateRequiredParameters(subscriptionId); // Create cache key var cacheKey = string.IsNullOrEmpty(tenant) ? $"{KustoClustersCacheKey}_{subscriptionId}" : $"{KustoClustersCacheKey}_{subscriptionId}_{tenant}"; // Try to get from cache first var cachedClusters = await _cacheService.GetAsync<List<string>>(CacheGroup, cacheKey, s_cacheDuration); if (cachedClusters != null) { return cachedClusters; } var subscription = await _subscriptionService.GetSubscription(subscriptionId, tenant, retryPolicy); var clusters = new List<string>(); await foreach (var cluster in subscription.GetKustoClustersAsync()) { if (cluster?.Data?.Name != null) { clusters.Add(cluster.Data.Name); } } await _cacheService.SetAsync(CacheGroup, cacheKey, clusters, s_cacheDuration); return clusters; } public async Task<KustoClusterResourceProxy?> GetCluster( string subscriptionId, string clusterName, string? tenant = null, RetryPolicyOptions? retryPolicy = null) { ValidateRequiredParameters(subscriptionId); var subscription = await _subscriptionService.GetSubscription(subscriptionId, tenant, retryPolicy); await foreach (var cluster in subscription.GetKustoClustersAsync()) { if (string.Equals(cluster.Data.Name, clusterName, StringComparison.OrdinalIgnoreCase)) { return new KustoClusterResourceProxy(cluster); } } return null; } public async Task<List<string>> ListDatabases( string subscriptionId, string clusterName, string? tenant = null, AuthMethod? authMethod = AuthMethod.Credential, RetryPolicyOptions? retryPolicy = null) { ValidateRequiredParameters(subscriptionId, clusterName); string clusterUri = await GetClusterUri(subscriptionId, clusterName, tenant, retryPolicy); return await ListDatabases(clusterUri, tenant, authMethod, retryPolicy); } public async Task<List<string>> ListDatabases( string clusterUri, string? tenant = null, AuthMethod? authMethod = AuthMethod.Credential, RetryPolicyOptions? retryPolicy = null) { ValidateRequiredParameters(clusterUri); var kustoClient = await GetOrCreateKustoClient(clusterUri, tenant).ConfigureAwait(false); var kustoResult = await kustoClient.ExecuteControlCommandAsync( "NetDefaultDB", ".show databases | project DatabaseName", CancellationToken.None); return KustoResultToStringList(kustoResult); } public async Task<List<string>> ListTables( string subscriptionId, string clusterName, string databaseName, string? tenant = null, AuthMethod? authMethod = AuthMethod.Credential, RetryPolicyOptions? retryPolicy = null) { ValidateRequiredParameters(subscriptionId, clusterName, databaseName); string clusterUri = await GetClusterUri(subscriptionId, clusterName, tenant, retryPolicy); return await ListTables(clusterUri, databaseName, tenant, authMethod, retryPolicy); } public async Task<List<string>> ListTables( string clusterUri, string databaseName, string? tenant = null, AuthMethod? authMethod = AuthMethod.Credential, RetryPolicyOptions? retryPolicy = null) { ValidateRequiredParameters(clusterUri, databaseName); var kustoClient = await GetOrCreateKustoClient(clusterUri, tenant); var kustoResult = await kustoClient.ExecuteControlCommandAsync( databaseName, ".show tables", CancellationToken.None); return KustoResultToStringList(kustoResult); } public async Task<string> GetTableSchema( string subscriptionId, string clusterName, string databaseName, string tableName, string? tenant = null, AuthMethod? authMethod = AuthMethod.Credential, RetryPolicyOptions? retryPolicy = null) { string clusterUri = await GetClusterUri(subscriptionId, clusterName, tenant, retryPolicy); return await GetTableSchema(clusterUri, databaseName, tableName, tenant, authMethod, retryPolicy); } public async Task<string> GetTableSchema( string clusterUri, string databaseName, string tableName, string? tenant = null, AuthMethod? authMethod = AuthMethod.Credential, RetryPolicyOptions? retryPolicy = null) { ValidateRequiredParameters(clusterUri, databaseName, tableName); var kustoClient = await GetOrCreateKustoClient(clusterUri, tenant); var kustoResult = await kustoClient.ExecuteQueryCommandAsync( databaseName, $".show table {tableName} cslschema", CancellationToken.None); var result = KustoResultToStringList(kustoResult); var schema = result.FirstOrDefault(); if (schema is not null) { return schema; } throw new Exception($"No schema found for table '{tableName}' in database '{databaseName}'."); } public async Task<List<JsonElement>> QueryItems( string subscriptionId, string clusterName, string databaseName, string query, string? tenant = null, AuthMethod? authMethod = AuthMethod.Credential, RetryPolicyOptions? retryPolicy = null) { ValidateRequiredParameters(subscriptionId, clusterName, databaseName, query); string clusterUri = await GetClusterUri(subscriptionId, clusterName, tenant, retryPolicy); return await QueryItems(clusterUri, databaseName, query, tenant, authMethod, retryPolicy); } public async Task<List<JsonElement>> QueryItems( string clusterUri, string databaseName, string query, string? tenant = null, AuthMethod? authMethod = AuthMethod.Credential, RetryPolicyOptions? retryPolicy = null) { ValidateRequiredParameters(clusterUri, databaseName, query); var cslQueryProvider = await GetOrCreateCslQueryProvider(clusterUri, tenant); var result = new List<JsonElement>(); var kustoResult = await cslQueryProvider.ExecuteQueryCommandAsync(databaseName, query, CancellationToken.None); if (kustoResult.JsonDocument is null) { return result; } var root = kustoResult.JsonDocument.RootElement; if (!root.TryGetProperty("Tables", out var tablesElement) || tablesElement.ValueKind != JsonValueKind.Array || tablesElement.GetArrayLength() == 0) { return result; } var table = tablesElement[0]; if (!table.TryGetProperty("Columns", out var columnsElement) || columnsElement.ValueKind != JsonValueKind.Array) { return result; } var columnsDict = columnsElement.EnumerateArray() .ToDictionary( column => column.GetProperty("ColumnName").GetString()!, column => column.GetProperty("ColumnType").GetString()! ); var columnsDictJson = "{" + string.Join(",", columnsDict.Select(kvp => $"\"{JsonEncodedText.Encode(kvp.Key)}\":\"{JsonEncodedText.Encode(kvp.Value)}\"")) + "}"; result.Add(JsonDocument.Parse(columnsDictJson).RootElement); if (!table.TryGetProperty("Rows", out var items) || items.ValueKind != JsonValueKind.Array) { return result; } foreach (var item in items.EnumerateArray()) { var json = item.ToString(); result.Add(JsonDocument.Parse(json).RootElement); } return result; } private List<string> KustoResultToStringList(KustoResult kustoResult) { var result = new List<string>(); if (kustoResult.JsonDocument is null) { return result; } var root = kustoResult.JsonDocument.RootElement; if (!root.TryGetProperty("Tables", out var tablesElement) || tablesElement.ValueKind != JsonValueKind.Array || tablesElement.GetArrayLength() == 0) { return result; } var table = tablesElement[0]; if (!table.TryGetProperty("Columns", out var columnsElement) || columnsElement.ValueKind != JsonValueKind.Array) { return result; } var columns = columnsElement.EnumerateArray() .Select(column => ($"{column.GetProperty("ColumnName").GetString()}:{column.GetProperty("ColumnType").GetString()}")); var columnsAsString = string.Join(",", columns); result.Add(columnsAsString); if (!table.TryGetProperty("Rows", out var items) || items.ValueKind != JsonValueKind.Array) { return result; } foreach (var item in items.EnumerateArray()) { var jsonAsString = item.ToString(); result.Add(jsonAsString); } return result; } private async Task<KustoClient> GetOrCreateKustoClient(string clusterUri, string? tenant) { var providerCacheKey = GetProviderCacheKey(clusterUri) + "_command"; var kustoClient = await _cacheService.GetAsync<KustoClient>(CacheGroup, providerCacheKey, s_providerCacheDuration); if (kustoClient == null) { var tokenCredential = await GetCredential(tenant); kustoClient = new KustoClient(clusterUri, tokenCredential, UserAgent, _httpClientService); await _cacheService.SetAsync(CacheGroup, providerCacheKey, kustoClient, s_providerCacheDuration); } return kustoClient; } private async Task<KustoClient> GetOrCreateCslQueryProvider(string clusterUri, string? tenant) { var providerCacheKey = GetProviderCacheKey(clusterUri) + "_query"; var kustoClient = await _cacheService.GetAsync<KustoClient>(CacheGroup, providerCacheKey, s_providerCacheDuration); if (kustoClient == null) { var tokenCredential = await GetCredential(tenant); kustoClient = new KustoClient(clusterUri, tokenCredential, UserAgent, _httpClientService); await _cacheService.SetAsync(CacheGroup, providerCacheKey, kustoClient, s_providerCacheDuration); } return kustoClient; } private async Task<string> GetClusterUri( string subscriptionId, string clusterName, string? tenant, RetryPolicyOptions? retryPolicy) { var cluster = await GetCluster(subscriptionId, clusterName, tenant, retryPolicy); var value = cluster?.ClusterUri; if (string.IsNullOrEmpty(value)) { throw new Exception($"Could not retrieve ClusterUri for cluster '{clusterName}'"); } return value!; } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Azure/azure-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server