KustoClient.cs•3.97 kB
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
using System.Text.Json.Nodes;
using Azure.Core;
using AzureMcp.Core.Services.Http;
namespace AzureMcp.Kusto.Services;
public class KustoClient(
    string clusterUri,
    TokenCredential tokenCredential,
    string userAgent,
    IHttpClientService httpClientService)
{
    private readonly string _clusterUri = clusterUri;
    private readonly TokenCredential _tokenCredential = tokenCredential;
    private readonly string _userAgent = userAgent;
    private readonly HttpClient _httpClient = httpClientService.CreateClient(new Uri(clusterUri));
    private static readonly string s_application = "AzureMCP";
    private static readonly string s_clientRequestIdPrefix = "AzMcp";
    private static readonly string s_default_scope = "https://kusto.kusto.windows.net/.default";
    public Task<KustoResult> ExecuteQueryCommandAsync(string database, string text, CancellationToken cancellationToken)
        => ExecuteCommandAsync("/v1/rest/query", database, text, cancellationToken);
    public Task<KustoResult> ExecuteControlCommandAsync(string database, string text, CancellationToken cancellationToken)
        => ExecuteCommandAsync("/v1/rest/mgmt", database, text, cancellationToken);
    private async Task<KustoResult> ExecuteCommandAsync(string endpoint, string database, string text, CancellationToken cancellationToken)
    {
        var uri = _clusterUri + endpoint;
        var httpRequest = await GenerateRequestAsync(uri, database, text, cancellationToken).ConfigureAwait(false);
        return await SendRequestAsync(_httpClient, httpRequest, cancellationToken).ConfigureAwait(false);
    }
    private async Task<HttpRequestMessage> GenerateRequestAsync(string uri, string database, string text, CancellationToken cancellationToken = default)
    {
        var httpRequest = new HttpRequestMessage(HttpMethod.Post, uri);
        var scopes = new string[]
        {
            s_default_scope
        };
        var clientRequestId = s_clientRequestIdPrefix + Guid.NewGuid().ToString();
        var tokenRequestContext = new TokenRequestContext(scopes, clientRequestId);
        var accessToken = await _tokenCredential.GetTokenAsync(tokenRequestContext, cancellationToken);
        httpRequest.Headers.Authorization = new System.Net.Http.Headers.AuthenticationHeaderValue("bearer", accessToken.Token);
        httpRequest.Headers.Add("User-Agent", _userAgent);
        httpRequest.Headers.Add("x-ms-client-request-id", clientRequestId);
        httpRequest.Headers.Add("x-ms-app", s_application);
        httpRequest.Headers.Add("x-ms-client-version", "Kusto.Client.Light");
        httpRequest.Headers.Accept.Add(new System.Net.Http.Headers.MediaTypeWithQualityHeaderValue("application/json"));
        var body = new JsonObject
        {
            { "db", database },
            { "csl", text }
        };
        var properties = new JsonObject
        {
            { "ClientRequestId", clientRequestId }
        };
        body.Add("properties", properties);
        var bodyStr = body.ToJsonString();
        httpRequest.Content = new StringContent(bodyStr);
        httpRequest.Content.Headers.ContentType = new System.Net.Http.Headers.MediaTypeHeaderValue("application/json", "utf-8");
        return httpRequest;
    }
    private async Task<KustoResult> SendRequestAsync(HttpClient httpClient, HttpRequestMessage httpRequest, CancellationToken cancellationToken = default)
    {
        var httpResponse = await httpClient.SendAsync(httpRequest, HttpCompletionOption.ResponseContentRead, cancellationToken);
        if (!httpResponse.IsSuccessStatusCode)
        {
            string errorContent = await httpResponse.Content.ReadAsStringAsync();
            throw new HttpRequestException($"Request failed with status code {httpResponse.StatusCode}: {errorContent}");
        }
        return KustoResult.FromHttpResponseMessage(httpResponse);
    }
}