Skip to main content
Glama
database.ts27.4 kB
// Database client for MCP Sigmund import { Pool } from 'pg'; import { simplifyTransactionDescription, formatCurrency, } from './smart-formatting.js'; import { Metadata, RawData, QueryPerformanceMetrics, } from './types.js'; import { APP_CONFIG } from './auth.js'; import { logInfo, logError } from './logger.js'; // Import database configuration from auth.ts // This file should be copied from auth.example.ts and customized import { DB_CONFIG } from './auth.js'; // Function to sanitize connection string for logging function sanitizeConnectionString(connectionString: string): string { try { const url = new URL(connectionString); if (url.password) { url.password = '***'; } return url.toString(); } catch { // If URL parsing fails, return a generic message return 'postgresql://***:***@***:***/***'; } } // Database connection pool singleton with proper error handling let pool: Pool | null = null; let isShuttingDown = false; // Query performance monitoring const queryMetrics: QueryPerformanceMetrics[] = []; const maxMetricsHistory = 1000; // Keep last 1000 queries // Initialize database connection pool with proper singleton pattern export function getDatabase(): Pool { if (isShuttingDown) { throw new Error('Database is shutting down'); } if (!pool) { try { pool = new Pool(DB_CONFIG); // Handle pool errors pool.on('error', err => { logError(`Unexpected error on idle client`, 'database', undefined, { message: err.message, stack: err.stack, name: err.name, timestamp: new Date().toISOString(), }); }); logInfo( `Connected to PostgreSQL database: ${sanitizeConnectionString(DB_CONFIG.connectionString)}`, 'database' ); } catch (error) { logError( `Failed to connect to PostgreSQL database`, 'database', undefined, { message: error instanceof Error ? error.message : String(error), stack: error instanceof Error ? error.stack : undefined, name: error instanceof Error ? error.name : 'UnknownError', timestamp: new Date().toISOString(), } ); throw new Error('PostgreSQL database connection failed'); } } return pool; } // Close database connection pool with graceful shutdown export async function closeDatabase(): Promise<void> { if (isShuttingDown) { return; // Already shutting down } isShuttingDown = true; if (pool) { try { logInfo('Gracefully closing database connections...', 'database'); await pool.end(); pool = null; logInfo('Database connections closed successfully', 'database'); } catch (error) { logError('Error closing database connections', 'database', undefined, { message: error instanceof Error ? error.message : String(error), stack: error instanceof Error ? error.stack : undefined, name: error instanceof Error ? error.name : 'UnknownError', timestamp: new Date().toISOString(), }); throw error; } finally { isShuttingDown = false; } } } // Health check for database connection export async function checkDatabaseHealth(): Promise<boolean> { try { const pool = getDatabase(); const client = await pool.connect(); try { await client.query('SELECT 1'); return true; } finally { client.release(); } } catch (error) { logError('Database health check failed', 'database', undefined, { message: error instanceof Error ? error.message : String(error), stack: error instanceof Error ? error.stack : undefined, name: error instanceof Error ? error.name : 'UnknownError', timestamp: new Date().toISOString(), }); return false; } } // Connection health monitoring with detailed metrics export interface DatabaseHealthMetrics { isHealthy: boolean; totalConnections: number; idleConnections: number; waitingClients: number; responseTime: number; lastChecked: string; error?: string; } export async function getDatabaseHealthMetrics(): Promise<DatabaseHealthMetrics> { const startTime = Date.now(); try { const pool = getDatabase(); // Test connection with a simple query const client = await pool.connect(); try { await client.query('SELECT 1'); const responseTime = Date.now() - startTime; return { isHealthy: true, totalConnections: pool.totalCount, idleConnections: pool.idleCount, waitingClients: pool.waitingCount, responseTime, lastChecked: new Date().toISOString(), }; } finally { client.release(); } } catch (error) { const responseTime = Date.now() - startTime; return { isHealthy: false, totalConnections: 0, idleConnections: 0, waitingClients: 0, responseTime, lastChecked: new Date().toISOString(), error: error instanceof Error ? error.message : String(error), }; } } // Periodic health monitoring let healthCheckInterval: NodeJS.Timeout | null = null; export function startHealthMonitoring(intervalMs: number = 30000): void { if (healthCheckInterval) { clearInterval(healthCheckInterval); } healthCheckInterval = setInterval(async () => { const metrics = await getDatabaseHealthMetrics(); if (!metrics.isHealthy) { console.error('⚠️ Database health check failed:', metrics.error); } else if (metrics.responseTime > 1000) { console.error( `⚠️ Database response time is slow: ${metrics.responseTime}ms` ); } else if (metrics.waitingClients > 5) { console.error( `⚠️ High number of waiting clients: ${metrics.waitingClients}` ); } }, intervalMs); console.error( `🔍 Started database health monitoring (interval: ${intervalMs}ms)` ); } export function stopHealthMonitoring(): void { if (healthCheckInterval) { clearInterval(healthCheckInterval); healthCheckInterval = null; console.error('🛑 Stopped database health monitoring'); } } // Query performance monitoring functions export function recordQueryPerformance( query: string, executionTime: number, rowCount: number, parameters?: (string | number | null)[] ): void { if (!APP_CONFIG.enablePerformanceMonitoring) { return; } const metric: QueryPerformanceMetrics = { query: query.substring(0, 200), // Truncate long queries execution_time: executionTime, row_count: rowCount, cache_hit: false, // We don't have caching yet timestamp: new Date().toISOString(), parameters: parameters ? { params: parameters.join(', ') } : undefined, // Log parameters }; queryMetrics.push(metric); // Keep only the last N metrics to prevent memory issues if (queryMetrics.length > maxMetricsHistory) { queryMetrics.shift(); } // Log slow queries if (executionTime > 1000) { // Queries taking more than 1 second console.error( `🐌 Slow query detected: ${executionTime}ms - ${query.substring(0, 100)}...` ); } } export function getQueryPerformanceMetrics(): { total_queries: number; average_execution_time: number; slowest_queries: QueryPerformanceMetrics[]; queries_by_performance: { fast: number; // < 100ms medium: number; // 100ms - 1000ms slow: number; // > 1000ms }; recent_metrics: QueryPerformanceMetrics[]; } { const totalQueries = queryMetrics.length; const averageExecutionTime = totalQueries > 0 ? queryMetrics.reduce((sum, m) => sum + m.execution_time, 0) / totalQueries : 0; const slowestQueries = [...queryMetrics] .sort((a, b) => b.execution_time - a.execution_time) .slice(0, 10); const performanceBreakdown = queryMetrics.reduce( (acc, metric) => { if (metric.execution_time < 100) { acc.fast++; } else if (metric.execution_time < 1000) { acc.medium++; } else { acc.slow++; } return acc; }, { fast: 0, medium: 0, slow: 0 } ); const recentMetrics = queryMetrics.slice(-50); // Last 50 queries return { total_queries: totalQueries, average_execution_time: Math.round(averageExecutionTime * 100) / 100, slowest_queries: slowestQueries, queries_by_performance: performanceBreakdown, recent_metrics: recentMetrics, }; } export function clearQueryMetrics(): void { queryMetrics.length = 0; console.error('🧹 Cleared query performance metrics'); } // Database query types based on PostgreSQL schema export interface Transaction { id: string; account_id: string; user_id: string; provider_id: string; external_id?: string; amount: number; currency?: string; local_amount?: number; local_currency?: string; exchange_rate?: number; date: string; booking_date?: string; value_date?: string; processing_date?: string; description?: string; original_description?: string; enhanced_description?: string; notes?: string; counterparty_name?: string; counterparty_iban?: string; counterparty_bic?: string; counterparty_account_number?: string; counterparty_sort_code?: string; counterparty_type?: string; merchant_name?: string; merchant_category?: string; merchant_category_code?: string; merchant_location?: string; merchant_address?: string; merchant_website?: string; category?: string; subcategory?: string; personal_category?: string; tax_category?: string; transaction_type?: string; transaction_method?: string; status: string; transaction_code?: string; domain_code?: string; family_code?: string; subfamily_code?: string; proprietary_code?: string; reference?: string; end_to_end_id?: string; mandate_id?: string; creditor_id?: string; debtor_id?: string; payment_id?: string; instruction_id?: string; check_number?: string; card_last4?: string; card_scheme?: string; card_type?: string; location_address?: string; location_city?: string; location_region?: string; location_postal_code?: string; location_country?: string; location_latitude?: number; location_longitude?: number; is_recurring?: boolean; is_subscription?: boolean; is_duplicate?: boolean; is_refund?: boolean; is_transfer?: boolean; enrichment_confidence?: number; running_balance?: number; tags?: string[]; metadata?: Metadata; raw_data?: RawData; created_at: string; updated_at: string; } export interface Account { id: string; user_id: string; provider_id: string; external_id?: string; iban?: string; bic?: string; account_number?: string; sort_code?: string; routing_number?: string; bsb?: string; account_type?: string; account_subtype?: string; display_name?: string; product_name?: string; product_description?: string; currency?: string; current_balance?: number; available_balance?: number; pending_balance?: number; cleared_balance?: number; overdraft_limit?: number; credit_limit?: number; amount_due?: number; minimum_payment?: number; status: string; opening_date?: string; closure_date?: string; institution_name?: string; institution_bic?: string; interest_rate?: number; interest_type?: string; accrued_interest?: number; masked_pan?: string; card_expiry_date?: string; metadata?: Metadata; raw_data?: RawData; last_synced_at?: string; last_updated: string; created_at: string; updated_at: string; } export interface Provider { id: string; name: string; environment: string; api_version?: string; capabilities?: string; last_sync?: string; sync_status?: string; status: string; metadata?: any; created_at: string; updated_at: string; } export interface User { id: string; provider_id?: string; external_id?: string; email?: string; phone?: string; status: string; metadata?: any; created_at: string; updated_at: string; } export interface Balance { id: string; account_id: string; user_id: string; provider_id: string; balance_type: string; amount: number; currency?: string; snapshot_date: string; metadata?: any; } // Database query functions export class DatabaseClient { private pool: Pool; constructor() { this.pool = getDatabase(); } // Sanitize sensitive data for logging private sanitizeForLogging(data: unknown): unknown { if (typeof data !== 'object' || data === null) { return data; } const sensitiveKeys = [ 'password', 'token', 'secret', 'key', 'auth', 'credential', 'iban', 'account_number', 'sort_code', 'bic', ]; const sanitized: Record<string, any> = { ...(data as Record<string, any>) }; for (const key in sanitized) { if ( sensitiveKeys.some(sensitive => key.toLowerCase().includes(sensitive)) ) { sanitized[key] = '[REDACTED]'; } else if (typeof sanitized[key] === 'object') { sanitized[key] = this.sanitizeForLogging(sanitized[key]); } } return sanitized; } // Helper for running all queries with sanitized logging and performance monitoring private async runAll<T>( query: string, params: (string | number | null)[] = [] ): Promise<T[]> { const startTime = Date.now(); const client = await this.pool.connect(); try { // Log sanitized query for debugging (only in development) if (APP_CONFIG.enableQueryLogging) { console.error( `🔍 Executing query: ${query.substring(0, 100)}${query.length > 100 ? '...' : ''}` ); console.error( `📊 Query params: ${JSON.stringify(this.sanitizeForLogging(params))}` ); } const result = await client.query(query, params); const executionTime = Date.now() - startTime; // Record performance metrics recordQueryPerformance( query, executionTime, result.rowCount || 0, params ); return result.rows as T[]; } catch (error) { const executionTime = Date.now() - startTime; recordQueryPerformance(query, executionTime, 0, params); console.error( `❌ Query failed: ${query.substring(0, 100)}${query.length > 100 ? '...' : ''}` ); console.error(`❌ Error:`, error); throw error; } finally { client.release(); } } // Helper for running get queries with sanitized logging and performance monitoring private async runGet<T>( query: string, params: (string | number | null)[] = [] ): Promise<T | undefined> { const startTime = Date.now(); const client = await this.pool.connect(); try { // Log sanitized query for debugging (only in development) if (APP_CONFIG.enableQueryLogging) { console.error( `🔍 Executing query: ${query.substring(0, 100)}${query.length > 100 ? '...' : ''}` ); console.error( `📊 Query params: ${JSON.stringify(this.sanitizeForLogging(params))}` ); } const result = await client.query(query, params); const executionTime = Date.now() - startTime; // Record performance metrics recordQueryPerformance( query, executionTime, result.rowCount || 0, params ); return result.rows[0] as T | undefined; } catch (error) { const executionTime = Date.now() - startTime; recordQueryPerformance(query, executionTime, 0, params); console.error( `❌ Query failed: ${query.substring(0, 100)}${query.length > 100 ? '...' : ''}` ); console.error(`❌ Error:`, error); throw error; } finally { client.release(); } } // Get all transactions with optional filtering async getTransactions( params: { limit?: number; offset?: number; dateFrom?: string; dateTo?: string; category?: string; accountId?: string; providerId?: string; userId?: string; simplified?: boolean; } = {} ): Promise<Transaction[]> { const { limit = 100, offset = 0, dateFrom, dateTo, category, accountId, providerId, userId, simplified = false, } = params; let query = ` SELECT t.*, a.display_name as account_name, p.name as provider_name FROM transactions t LEFT JOIN accounts a ON t.account_id = a.id LEFT JOIN providers p ON t.provider_id = p.id WHERE 1=1 `; const conditions: string[] = []; const values: (string | number | null)[] = []; let paramIndex = 1; if (dateFrom) { conditions.push(`t.date >= $${paramIndex++}`); values.push(dateFrom); } if (dateTo) { conditions.push(`t.date <= $${paramIndex++}`); values.push(dateTo); } if (category) { conditions.push(`t.category = $${paramIndex++}`); values.push(category); } if (accountId) { conditions.push(`t.account_id = $${paramIndex++}`); values.push(accountId); } if (providerId) { conditions.push(`t.provider_id = $${paramIndex++}`); values.push(providerId); } if (userId) { conditions.push(`t.user_id = $${paramIndex++}`); values.push(userId); } if (conditions.length > 0) { query += ' AND ' + conditions.join(' AND '); } query += ` ORDER BY t.date DESC LIMIT $${paramIndex++} OFFSET $${paramIndex++}`; values.push(limit, offset); const transactions = await this.runAll<Transaction>(query, values); // Simplify transaction data if requested if (simplified) { return transactions.map(transaction => ({ ...transaction, description: simplifyTransactionDescription( transaction.description || '' ), // Add simplified fields for better user experience simplified_description: simplifyTransactionDescription( transaction.description || '' ), formatted_amount: formatCurrency( parseFloat(transaction.amount.toString()), transaction.currency || 'EUR' ), formatted_date: new Date(transaction.date).toLocaleDateString('en-US', { year: 'numeric', month: 'short', day: 'numeric', }), })); } return transactions; } // Get monthly aggregated data (calculated from transactions) async getMonthlyData( params: { months?: number; yearMonth?: string; providerId?: string; userId?: string; } = {} ): Promise< Array<{ year_month: string; total_income: number; total_expenses: number; net_flow: number; transaction_count: number; provider_id?: string; }> > { const { months = 12, yearMonth, providerId, userId } = params; let query = ` SELECT TO_CHAR(t.date, 'YYYY-MM') as year_month, t.provider_id, SUM(CASE WHEN t.amount > 0 THEN t.amount ELSE 0 END)::numeric as total_income, SUM(CASE WHEN t.amount < 0 THEN ABS(t.amount) ELSE 0 END)::numeric as total_expenses, SUM(t.amount)::numeric as net_flow, COUNT(*)::integer as transaction_count FROM transactions t WHERE 1=1 `; const conditions: string[] = []; const values: (string | number | null)[] = []; let paramIndex = 1; if (yearMonth) { conditions.push(`TO_CHAR(t.date, 'YYYY-MM') = $${paramIndex++}`); values.push(yearMonth); } else { // Get last N months conditions.push(`t.date >= $${paramIndex++}`); const cutoffDate = new Date(); cutoffDate.setMonth(cutoffDate.getMonth() - months); const cutoffMonth = cutoffDate.toISOString().slice(0, 7) + '-01'; values.push(cutoffMonth); } if (providerId) { conditions.push(`t.provider_id = $${paramIndex++}`); values.push(providerId); } if (userId) { conditions.push(`t.user_id = $${paramIndex++}`); values.push(userId); } if (conditions.length > 0) { query += ' AND ' + conditions.join(' AND '); } query += " GROUP BY TO_CHAR(t.date, 'YYYY-MM'), t.provider_id ORDER BY year_month DESC"; return this.runAll<any>(query, values); } // Get accounts with balances async getAccounts( params: { providerId?: string; userId?: string; accountType?: string; } = {} ): Promise<Account[]> { const { providerId, userId, accountType } = params; let query = ` SELECT a.*, p.name as provider_name FROM accounts a LEFT JOIN providers p ON a.provider_id = p.id WHERE a.status = 'active' `; const conditions: string[] = []; const values: (string | number | null)[] = []; let paramIndex = 1; if (providerId) { conditions.push(`a.provider_id = $${paramIndex++}`); values.push(providerId); } if (userId) { conditions.push(`a.user_id = $${paramIndex++}`); values.push(userId); } if (accountType) { conditions.push(`a.account_type = $${paramIndex++}`); values.push(accountType); } if (conditions.length > 0) { query += ' AND ' + conditions.join(' AND '); } query += ' ORDER BY a.current_balance DESC'; return this.runAll<Account>(query, values); } // Get account balances async getAccountBalances( params: { providerId?: string; userId?: string; } = {} ): Promise< Array<{ account_id: string; account_name: string; provider_id: string; provider_name: string; current_balance: number; available_balance: number; currency: string; last_updated: string; }> > { const { providerId, userId } = params; let query = ` SELECT a.id as account_id, a.display_name as account_name, a.provider_id, p.name as provider_name, a.current_balance::numeric as current_balance, a.available_balance::numeric as available_balance, a.currency, a.last_updated FROM accounts a LEFT JOIN providers p ON a.provider_id = p.id WHERE a.status = 'active' `; const conditions: string[] = []; const values: (string | number | null)[] = []; let paramIndex = 1; if (providerId) { conditions.push(`a.provider_id = $${paramIndex++}`); values.push(providerId); } if (userId) { conditions.push(`a.user_id = $${paramIndex++}`); values.push(userId); } if (conditions.length > 0) { query += ' AND ' + conditions.join(' AND '); } query += ' ORDER BY a.current_balance DESC'; return this.runAll<any>(query, values); } // Get spending analysis by category async getSpendingAnalysis( params: { months?: number; category?: string; providerId?: string; userId?: string; } = {} ): Promise< Array<{ category: string; total_amount: number; transaction_count: number; average_amount: number; provider_id: string; provider_name: string; }> > { const { months = 3, category, providerId, userId } = params; const cutoffDate = new Date(); cutoffDate.setMonth(cutoffDate.getMonth() - months); const cutoffDateStr = cutoffDate.toISOString().slice(0, 10); let query = ` SELECT t.category, t.provider_id, p.name as provider_name, SUM(ABS(t.amount))::numeric as total_amount, COUNT(*)::integer as transaction_count, AVG(ABS(t.amount))::numeric as average_amount FROM transactions t LEFT JOIN providers p ON t.provider_id = p.id WHERE t.date >= $1 AND t.amount < 0 AND t.category IS NOT NULL `; const values: any[] = [cutoffDateStr]; let paramIndex = 2; if (category) { query += ` AND t.category = $${paramIndex++}`; values.push(category); } if (providerId) { query += ` AND t.provider_id = $${paramIndex++}`; values.push(providerId); } if (userId) { query += ` AND t.user_id = $${paramIndex++}`; values.push(userId); } query += ' GROUP BY t.category, t.provider_id, p.name ORDER BY total_amount DESC'; return this.runAll<any>(query, values); } // Get available providers async getProviders(): Promise< Array<{ provider_id: string; provider_name: string; transaction_count: number; account_count: number; }> > { const query = ` SELECT p.id as provider_id, p.name as provider_name, COUNT(DISTINCT t.id)::integer as transaction_count, COUNT(DISTINCT a.id)::integer as account_count FROM providers p LEFT JOIN transactions t ON p.id = t.provider_id LEFT JOIN accounts a ON p.id = a.provider_id WHERE p.status = 'active' GROUP BY p.id, p.name ORDER BY transaction_count DESC `; return this.runAll<any>(query); } // Get financial overview async getFinancialOverview( params: { providerId?: string; userId?: string; } = {} ): Promise<{ total_income: number; total_expenses: number; net_cash_flow: number; transaction_count: number; account_count: number; providers: Array<{ id: string; name: string }>; }> { const { providerId, userId } = params; // Get transaction summary let transactionQuery = ` SELECT SUM(CASE WHEN amount > 0 THEN amount ELSE 0 END)::numeric as total_income, SUM(CASE WHEN amount < 0 THEN ABS(amount) ELSE 0 END)::numeric as total_expenses, SUM(amount)::numeric as net_cash_flow, COUNT(*)::integer as transaction_count FROM transactions t WHERE 1=1 `; const transactionValues: (string | number | null)[] = []; let paramIndex = 1; if (providerId) { transactionQuery += ` AND t.provider_id = $${paramIndex++}`; transactionValues.push(providerId); } if (userId) { transactionQuery += ` AND t.user_id = $${paramIndex++}`; transactionValues.push(userId); } // Get account count let accountQuery = ` SELECT COUNT(*)::integer as account_count FROM accounts a WHERE a.status = 'active' `; const accountValues: (string | number | null)[] = []; let accountParamIndex = 1; if (providerId) { accountQuery += ` AND a.provider_id = $${accountParamIndex++}`; accountValues.push(providerId); } if (userId) { accountQuery += ` AND a.user_id = $${accountParamIndex++}`; accountValues.push(userId); } const transactionData = await this.runGet<any>( transactionQuery, transactionValues ); const accountData = await this.runGet<any>(accountQuery, accountValues); const providers = await this.getProviders(); return { total_income: transactionData?.total_income || 0, total_expenses: transactionData?.total_expenses || 0, net_cash_flow: transactionData?.net_cash_flow || 0, transaction_count: transactionData?.transaction_count || 0, account_count: accountData?.account_count || 0, providers: providers.map(p => ({ id: p.provider_id, name: p.provider_name, })), }; } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/radup/mcp-sigmund'

If you have feedback or need assistance with the MCP directory API, please join our Discord server