Skip to main content
Glama

powerbi-tabular-mcp

TabularConnection.cs24.5 kB
using System.Data; using Microsoft.AnalysisServices.AdomdClient; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; using pbi_local_mcp.Configuration; using pbi_local_mcp.Core; namespace pbi_local_mcp; #pragma warning disable CS1591 // Missing XML comment for publicly visible type or member public enum QueryType #pragma warning restore CS1591 // Missing XML comment for publicly visible type or member { #pragma warning disable CS1591 // Missing XML comment for publicly visible type or member DAX, #pragma warning restore CS1591 // Missing XML comment for publicly visible type or member #pragma warning disable CS1591 // Missing XML comment for publicly visible type or member DMV #pragma warning restore CS1591 // Missing XML comment for publicly visible type or member } /// <summary> /// Implements connection and query execution for PowerBI's tabular model /// </summary> public class TabularConnection : ITabularConnection, IDisposable { private readonly ILogger<TabularConnection> _logger = null!; private readonly string _connectionString; private readonly AdomdConnection _connection; private bool _disposed; // Metadata / diagnostics private readonly int _port; private readonly string? _databaseId; private readonly Version _assemblyVersion = typeof(TabularConnection).Assembly.GetName().Version ?? new Version(0, 0, 0, 0); /// <summary> /// Gets the numeric port the connection was initialized with (0 if unknown or not parsed). /// </summary> public int Port => _port; /// <summary> /// Gets the database (catalog) identifier for the connected model, or null if not provided. /// </summary> public string? DatabaseId => _databaseId; /// <summary> /// UTC timestamp when this connection instance was created. /// </summary> public DateTime StartupUtc { get; } = DateTime.UtcNow; /// <summary> /// Version of the assembly implementing <see cref="TabularConnection"/>. /// </summary> public Version AssemblyVersion => _assemblyVersion; // Schema summary caching private volatile SchemaSummary? _schemaCache; private DateTime _schemaCacheExpiry; private static readonly TimeSpan SchemaCacheTtl = TimeSpan.FromSeconds(30); /// <summary> /// Default command timeout in seconds /// </summary> private const int DefaultCommandTimeout = 60; /// <summary> /// Initializes a new instance of the TabularConnection class with configuration settings /// </summary> /// <param name="config">Power BI configuration settings</param> /// <param name="logger">Logger instance for tracing.</param> /// <exception cref="ArgumentNullException">Thrown when configuration is null</exception> /// <exception cref="InvalidOperationException">Thrown when configuration values are missing or invalid</exception> public TabularConnection(PowerBiConfig config, ILogger<TabularConnection> logger) { if (config == null) throw new ArgumentNullException(nameof(config)); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); config.Validate(); _connectionString = $"Data Source=localhost:{config.Port};Initial Catalog={config.DbId}"; _connection = new AdomdConnection(_connectionString); if (!int.TryParse(config.Port, out _port) || _port < 0) _port = 0; _databaseId = string.IsNullOrWhiteSpace(config.DbId) ? null : config.DbId; } /// <summary> /// Initializes a new instance of <see cref="TabularConnection"/> using the default null logger. /// </summary> /// <param name="config">Configuration settings for the Power BI connection.</param> public TabularConnection(PowerBiConfig config) : this(config, NullLogger<TabularConnection>.Instance) { } /// <summary> /// Initializes a new instance of the TabularConnection class with explicit connection parameters /// </summary> /// <param name="logger">Logger for diagnostic information</param> /// <param name="port">Port number for the Power BI instance</param> /// <param name="dbId">Database ID (catalog name) to connect to</param> /// <exception cref="ArgumentNullException">Thrown when required parameters are null</exception> /// <exception cref="ArgumentException">Thrown when required parameters are empty</exception> public TabularConnection(ILogger<TabularConnection> logger, string port, string dbId) { _logger = logger ?? throw new ArgumentNullException(nameof(logger)); if (string.IsNullOrEmpty(port)) throw new ArgumentException("Port cannot be null or empty", nameof(port)); if (string.IsNullOrEmpty(dbId)) throw new ArgumentException("Database ID cannot be null or empty", nameof(dbId)); _connectionString = $"Data Source=localhost:{port};Initial Catalog={dbId}"; _connection = new AdomdConnection(_connectionString); if (!int.TryParse(port, out _port) || _port < 0) _port = 0; _databaseId = dbId; } /// <summary> /// Initializes a new instance of the TabularConnection class with a pre-configured connection /// Used primarily for testing with mock connections /// </summary> /// <param name="logger">Logger for diagnostic information</param> /// <param name="connection">Pre-configured ADOMD connection</param> /// <exception cref="ArgumentNullException">Thrown when required parameters are null</exception> protected internal TabularConnection(ILogger<TabularConnection> logger, AdomdConnection connection) { _logger = logger ?? throw new ArgumentNullException(nameof(logger)); _connection = connection ?? throw new ArgumentNullException(nameof(connection)); _connectionString = connection.ConnectionString; // Attempt to parse port & catalog from connection string (best-effort) try { var parts = _connectionString.Split(';', StringSplitOptions.RemoveEmptyEntries | StringSplitOptions.TrimEntries); foreach (var p in parts) { if (p.StartsWith("Data Source=", StringComparison.OrdinalIgnoreCase)) { var ds = p.Substring("Data Source=".Length); var colonIdx = ds.LastIndexOf(':'); if (colonIdx > -1 && int.TryParse(ds[(colonIdx + 1)..], out var portParsed)) _port = portParsed; } else if (p.StartsWith("Initial Catalog=", StringComparison.OrdinalIgnoreCase)) { _databaseId = p.Substring("Initial Catalog=".Length); } } } catch { _port = 0; _databaseId = null; } } /// <inheritdoc/> public virtual async Task<IEnumerable<Dictionary<string, object?>>> ExecAsync(string query, QueryType queryType = QueryType.DAX) { if (string.IsNullOrEmpty(query)) throw new ArgumentNullException(nameof(query), "Query cannot be null or empty"); // queryType parameter is available here if specific logic is needed, // but AdomdClient typically uses CommandType.Text for both DAX and DMVs. // The distinction might be more for logging or higher-level logic if any. _logger.LogDebug("Executing query ({QueryType}): {Query}", queryType, query); await EnsureConnectionOpenAsync(); try { using var cmd = _connection.CreateCommand(); cmd.CommandText = query; cmd.CommandType = CommandType.Text; // Standard for both DAX and DMVs via AdomdClient cmd.CommandTimeout = DefaultCommandTimeout; var results = new List<Dictionary<string, object?>>(); using var reader = await Task.Run(() => cmd.ExecuteReader()).ConfigureAwait(false); while (await Task.Run(() => reader.Read()).ConfigureAwait(false)) { var row = new Dictionary<string, object?>(); for (int i = 0; i < reader.FieldCount; i++) { var name = reader.GetName(i); var value = reader.IsDBNull(i) ? DBNull.Value : reader.GetValue(i); row[name] = value; } results.Add(row); } return results; } catch (Exception ex) { _logger.LogError(ex, "Error executing {QueryType} query: {Query}", queryType, query); // Create enhanced error message with query details for better error reporting var enhancedMessage = DaxTools.CreateDetailedErrorMessage(ex, query, query, queryType); // Use standard Exception instead of McpException for application-level database errors // McpException is only for MCP protocol-level errors, not database query failures throw new Exception(enhancedMessage, ex); } } /// <inheritdoc/> public virtual async Task<IEnumerable<Dictionary<string, object?>>> ExecAsync(string query, QueryType queryType, CancellationToken cancellationToken) { if (string.IsNullOrEmpty(query)) throw new ArgumentNullException(nameof(query), "Query cannot be null or empty"); _logger.LogDebug("Executing query ({QueryType}) with cancellation: {Query}", queryType, query); cancellationToken.ThrowIfCancellationRequested(); await EnsureConnectionOpenAsync(cancellationToken); try { using var cmd = _connection.CreateCommand(); cmd.CommandText = query; cmd.CommandType = CommandType.Text; cmd.CommandTimeout = DefaultCommandTimeout; var results = new List<Dictionary<string, object?>>(); using var reader = await Task.Run(() => cmd.ExecuteReader(), cancellationToken).ConfigureAwait(false); while (await Task.Run(() => reader.Read(), cancellationToken).ConfigureAwait(false)) { cancellationToken.ThrowIfCancellationRequested(); var row = new Dictionary<string, object?>(); for (int i = 0; i < reader.FieldCount; i++) { var name = reader.GetName(i); var value = reader.IsDBNull(i) ? DBNull.Value : reader.GetValue(i); row[name] = value; } results.Add(row); } return results; } catch (Exception ex) when (ex is not OperationCanceledException) { _logger.LogError(ex, "Error executing {QueryType} query: {Query}", queryType, query); // Create enhanced error message with query details for better error reporting var enhancedMessage = DaxTools.CreateDetailedErrorMessage(ex, query, query, queryType); // Use standard Exception instead of McpException for application-level database errors // McpException is only for MCP protocol-level errors, not database query failures throw new Exception(enhancedMessage, ex); } } /// <inheritdoc/> public virtual async Task<IEnumerable<Dictionary<string, object?>>> ExecInfoAsync(string func, string filterExpr) { if (string.IsNullOrEmpty(func)) throw new ArgumentNullException(nameof(func), "Function name cannot be null or empty"); if (func.Contains(";") || func.Contains("--") || func.Contains("/*") || func.Contains("*/")) throw new ArgumentException("Function name contains invalid characters", nameof(func)); string query; if (string.IsNullOrEmpty(filterExpr)) { query = $"SELECT * FROM ${func}"; } else { FilterExpressionValidator.ValidateFilterExpression(filterExpr); query = $"SELECT * FROM ${func} WHERE {filterExpr}"; } // ExecInfoAsync implies a DMV query return await ExecAsync(query, QueryType.DMV); } /// <inheritdoc/> public virtual async Task<IEnumerable<Dictionary<string, object?>>> ExecInfoAsync(string func, string filterExpr, CancellationToken cancellationToken) { if (string.IsNullOrEmpty(func)) throw new ArgumentNullException(nameof(func), "Function name cannot be null or empty"); if (func.Contains(";") || func.Contains("--") || func.Contains("/*") || func.Contains("*/")) throw new ArgumentException("Function name contains invalid characters", nameof(func)); string query; if (string.IsNullOrEmpty(filterExpr)) { query = $"SELECT * FROM ${func}"; } else { FilterExpressionValidator.ValidateFilterExpression(filterExpr); query = $"SELECT * FROM ${func} WHERE {filterExpr}"; } // ExecInfoAsync implies a DMV query return await ExecAsync(query, QueryType.DMV, cancellationToken); } /// <summary> /// Ensures that the connection is open, opening it if necessary /// </summary> /// <returns>A task representing the asynchronous operation</returns> /// <exception cref="InvalidOperationException">Thrown when the connection cannot be opened</exception> private async Task EnsureConnectionOpenAsync() { if (_disposed) throw new ObjectDisposedException(nameof(TabularConnection), "Connection has been disposed"); try { if (_connection.State != ConnectionState.Open) { await Task.Run(() => _connection.Open()).ConfigureAwait(false); } if (_connection.State != ConnectionState.Open) throw new InvalidOperationException("Failed to open connection to PowerBI instance"); } catch (Exception ex) { _logger.LogError(ex, "Error connecting to PowerBI instance: {ConnectionString}", _connectionString.Replace(";", ", ")); throw; } } /// <summary> /// Ensures that the connection is open, opening it if necessary, with cancellation support /// </summary> /// <param name="cancellationToken">Token to monitor for cancellation requests</param> /// <returns>A task representing the asynchronous operation</returns> /// <exception cref="InvalidOperationException">Thrown when the connection cannot be opened</exception> /// <exception cref="OperationCanceledException">Thrown when the operation is canceled</exception> private async Task EnsureConnectionOpenAsync(CancellationToken cancellationToken) { if (_disposed) throw new ObjectDisposedException(nameof(TabularConnection), "Connection has been disposed"); cancellationToken.ThrowIfCancellationRequested(); try { if (_connection.State != ConnectionState.Open) { await Task.Run(() => _connection.Open(), cancellationToken).ConfigureAwait(false); } if (_connection.State != ConnectionState.Open) throw new InvalidOperationException("Failed to open connection to PowerBI instance"); } catch (Exception ex) when (ex is not OperationCanceledException) { _logger.LogError(ex, "Error connecting to PowerBI instance: {ConnectionString}", _connectionString.Replace(";", ", ")); throw; } } /// <summary> /// Disposes resources used by the connection /// </summary> public void Dispose() { Dispose(true); GC.SuppressFinalize(this); } /// <summary> /// Releases the unmanaged resources used by the connection and optionally releases the managed resources /// </summary> /// <param name="disposing">True to release both managed and unmanaged resources; false to release only unmanaged resources</param> protected virtual void Dispose(bool disposing) { if (!_disposed) { if (disposing) { try { if (_connection != null) { if (_connection.State == ConnectionState.Open) { _connection.Close(); } _connection.Dispose(); } } catch (Exception ex) { _logger?.LogError(ex, "Error disposing connection"); } } _disposed = true; } } /// <summary> /// Finalizer to ensure resource cleanup /// </summary> ~TabularConnection() { Dispose(false); } /// <summary> /// Discovers available databases on the Power BI instance at the specified port /// </summary> /// <param name="port">Port number for the Power BI instance</param> /// <param name="logger">Logger instance for diagnostic information</param> /// <returns>List of available database names</returns> /// <exception cref="ArgumentException">Thrown when port is invalid</exception> /// <exception cref="InvalidOperationException">Thrown when no databases are found or connection fails</exception> public static async Task<List<string>> DiscoverDatabasesAsync(string port, ILogger<TabularConnection>? logger = null) { if (string.IsNullOrWhiteSpace(port)) throw new ArgumentException("Port cannot be null or empty", nameof(port)); if (!int.TryParse(port, out var portNumber) || portNumber < 1 || portNumber > 65535) throw new ArgumentException($"Invalid port number: {port}", nameof(port)); logger = logger ?? NullLogger<TabularConnection>.Instance; // Use a temporary connection without specifying Initial Catalog to discover databases var connectionString = $"Data Source=localhost:{port}"; try { using var connection = new AdomdConnection(connectionString); logger.LogDebug("Attempting to discover databases on port {Port}", port); await Task.Run(() => connection.Open()).ConfigureAwait(false); // Query for available catalogs/databases (using same format as InstanceDiscovery) using var cmd = connection.CreateCommand(); cmd.CommandText = "SELECT * FROM $SYSTEM.DBSCHEMA_CATALOGS"; cmd.CommandType = CommandType.Text; cmd.CommandTimeout = 30; // Shorter timeout for discovery var databases = new List<string>(); using var reader = await Task.Run(() => cmd.ExecuteReader()).ConfigureAwait(false); while (await Task.Run(() => reader.Read()).ConfigureAwait(false)) { var catalogName = reader["CATALOG_NAME"]?.ToString(); if (!string.IsNullOrWhiteSpace(catalogName)) { databases.Add(catalogName); } } logger.LogDebug("Discovered {Count} databases on port {Port}: {Databases}", databases.Count, port, string.Join(", ", databases)); if (!databases.Any()) { throw new InvalidOperationException($"No databases found on Power BI instance at port {port}"); } return databases; } catch (Exception ex) { logger.LogError(ex, "Failed to discover databases on port {Port}", port); throw new InvalidOperationException($"Could not discover databases on port {port}: {ex.Message}", ex); } } /// <summary> /// Returns a lightweight cached summary of model schema counts (tables, measures, columns). /// Uses DMV queries and caches results for a short interval. /// </summary> public async Task<SchemaSummary> GetSchemaSummaryAsync(CancellationToken ct = default) { var now = DateTime.UtcNow; var cache = _schemaCache; if (cache != null && now < _schemaCacheExpiry) { return cache; } // Simple double-checked locking lock (this) { cache = _schemaCache; if (cache != null && now < _schemaCacheExpiry) return cache; } try { ct.ThrowIfCancellationRequested(); int tableCount = await GetCountAsync("SELECT COUNT(*) AS C FROM $SYSTEM.TMSCHEMA_TABLES", ct); int measureCount = await GetCountAsync("SELECT COUNT(*) AS C FROM $SYSTEM.TMSCHEMA_MEASURES", ct); int columnCount = await GetCountAsync("SELECT COUNT(*) AS C FROM $SYSTEM.TMSCHEMA_COLUMNS", ct); DateTime? lastRefreshed = null; try { var dbRows = await ExecAsync("SELECT LASTPROCESSED FROM $SYSTEM.TMSCHEMA_DATABASES", QueryType.DMV, ct); var first = dbRows.FirstOrDefault(); if (first != null && first.TryGetValue("LASTPROCESSED", out var v) && v != null && v != DBNull.Value) { // Convert & normalize to UTC lastRefreshed = Convert.ToDateTime(v, System.Globalization.CultureInfo.InvariantCulture).ToUniversalTime(); } } catch { // Non-fatal: ignore if DMV not available or fails } var summary = new SchemaSummary(tableCount, measureCount, columnCount, lastRefreshed); lock (this) { _schemaCache = summary; _schemaCacheExpiry = DateTime.UtcNow + SchemaCacheTtl; } return summary; } catch (Exception ex) when (ex is not OperationCanceledException) { _logger.LogDebug(ex, "Failed to build schema summary"); // Return empty summary to avoid throwing for metadata endpoints var fallback = new SchemaSummary(0, 0, 0, null); lock (this) { _schemaCache = fallback; _schemaCacheExpiry = DateTime.UtcNow + TimeSpan.FromSeconds(10); } return fallback; } } private async Task<int> GetCountAsync(string dmvQuery, CancellationToken ct) { var rows = await ExecAsync(dmvQuery, QueryType.DMV, ct); var first = rows.FirstOrDefault(); if (first == null) return 0; // Try common keys object? value = null; if (first.Count == 1) { value = first.Values.FirstOrDefault(); } else if (first.TryGetValue("C", out var cVal)) { value = cVal; } if (value == null || value == DBNull.Value) return 0; try { return Convert.ToInt32(value, System.Globalization.CultureInfo.InvariantCulture); } catch { return 0; } } /// <summary> /// Creates a TabularConnection with automatic database discovery /// Connects to the Power BI instance and selects the first available database /// </summary> /// <param name="port">Port number for the Power BI instance</param> /// <param name="logger">Logger instance for diagnostic information</param> /// <returns>TabularConnection configured with the discovered database</returns> /// <exception cref="ArgumentException">Thrown when port is invalid</exception> /// <exception cref="InvalidOperationException">Thrown when no databases are found or connection fails</exception> public static async Task<TabularConnection> CreateWithDiscoveryAsync(string port, ILogger<TabularConnection>? logger = null) { logger = logger ?? NullLogger<TabularConnection>.Instance; var databases = await DiscoverDatabasesAsync(port, logger); // Use the first available database var selectedDatabase = databases.First(); logger.LogInformation("Auto-selected database '{Database}' from {Count} available databases on port {Port}", selectedDatabase, databases.Count, port); return new TabularConnection(logger, port, selectedDatabase); } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/jonaolden/tabular-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server