// Database client for MCP Sigmund
import { Pool } from 'pg';
import {
simplifyTransactionDescription,
formatCurrency,
} from './smart-formatting.js';
import {
Metadata,
RawData,
QueryPerformanceMetrics,
} from './types.js';
import { APP_CONFIG } from './auth.js';
import { logInfo, logError } from './logger.js';
// Import database configuration from auth.ts
// This file should be copied from auth.example.ts and customized
import { DB_CONFIG } from './auth.js';
// Function to sanitize connection string for logging
function sanitizeConnectionString(connectionString: string): string {
try {
const url = new URL(connectionString);
if (url.password) {
url.password = '***';
}
return url.toString();
} catch {
// If URL parsing fails, return a generic message
return 'postgresql://***:***@***:***/***';
}
}
// Database connection pool singleton with proper error handling
let pool: Pool | null = null;
let isShuttingDown = false;
// Query performance monitoring
const queryMetrics: QueryPerformanceMetrics[] = [];
const maxMetricsHistory = 1000; // Keep last 1000 queries
// Initialize database connection pool with proper singleton pattern
export function getDatabase(): Pool {
if (isShuttingDown) {
throw new Error('Database is shutting down');
}
if (!pool) {
try {
pool = new Pool(DB_CONFIG);
// Handle pool errors
pool.on('error', err => {
logError(`Unexpected error on idle client`, 'database', undefined, {
message: err.message,
stack: err.stack,
name: err.name,
timestamp: new Date().toISOString(),
});
});
logInfo(
`Connected to PostgreSQL database: ${sanitizeConnectionString(DB_CONFIG.connectionString)}`,
'database'
);
} catch (error) {
logError(
`Failed to connect to PostgreSQL database`,
'database',
undefined,
{
message: error instanceof Error ? error.message : String(error),
stack: error instanceof Error ? error.stack : undefined,
name: error instanceof Error ? error.name : 'UnknownError',
timestamp: new Date().toISOString(),
}
);
throw new Error('PostgreSQL database connection failed');
}
}
return pool;
}
// Close database connection pool with graceful shutdown
export async function closeDatabase(): Promise<void> {
if (isShuttingDown) {
return; // Already shutting down
}
isShuttingDown = true;
if (pool) {
try {
logInfo('Gracefully closing database connections...', 'database');
await pool.end();
pool = null;
logInfo('Database connections closed successfully', 'database');
} catch (error) {
logError('Error closing database connections', 'database', undefined, {
message: error instanceof Error ? error.message : String(error),
stack: error instanceof Error ? error.stack : undefined,
name: error instanceof Error ? error.name : 'UnknownError',
timestamp: new Date().toISOString(),
});
throw error;
} finally {
isShuttingDown = false;
}
}
}
// Health check for database connection
export async function checkDatabaseHealth(): Promise<boolean> {
try {
const pool = getDatabase();
const client = await pool.connect();
try {
await client.query('SELECT 1');
return true;
} finally {
client.release();
}
} catch (error) {
logError('Database health check failed', 'database', undefined, {
message: error instanceof Error ? error.message : String(error),
stack: error instanceof Error ? error.stack : undefined,
name: error instanceof Error ? error.name : 'UnknownError',
timestamp: new Date().toISOString(),
});
return false;
}
}
// Connection health monitoring with detailed metrics
export interface DatabaseHealthMetrics {
isHealthy: boolean;
totalConnections: number;
idleConnections: number;
waitingClients: number;
responseTime: number;
lastChecked: string;
error?: string;
}
export async function getDatabaseHealthMetrics(): Promise<DatabaseHealthMetrics> {
const startTime = Date.now();
try {
const pool = getDatabase();
// Test connection with a simple query
const client = await pool.connect();
try {
await client.query('SELECT 1');
const responseTime = Date.now() - startTime;
return {
isHealthy: true,
totalConnections: pool.totalCount,
idleConnections: pool.idleCount,
waitingClients: pool.waitingCount,
responseTime,
lastChecked: new Date().toISOString(),
};
} finally {
client.release();
}
} catch (error) {
const responseTime = Date.now() - startTime;
return {
isHealthy: false,
totalConnections: 0,
idleConnections: 0,
waitingClients: 0,
responseTime,
lastChecked: new Date().toISOString(),
error: error instanceof Error ? error.message : String(error),
};
}
}
// Periodic health monitoring
let healthCheckInterval: NodeJS.Timeout | null = null;
export function startHealthMonitoring(intervalMs: number = 30000): void {
if (healthCheckInterval) {
clearInterval(healthCheckInterval);
}
healthCheckInterval = setInterval(async () => {
const metrics = await getDatabaseHealthMetrics();
if (!metrics.isHealthy) {
console.error('⚠️ Database health check failed:', metrics.error);
} else if (metrics.responseTime > 1000) {
console.error(
`⚠️ Database response time is slow: ${metrics.responseTime}ms`
);
} else if (metrics.waitingClients > 5) {
console.error(
`⚠️ High number of waiting clients: ${metrics.waitingClients}`
);
}
}, intervalMs);
console.error(
`🔍 Started database health monitoring (interval: ${intervalMs}ms)`
);
}
export function stopHealthMonitoring(): void {
if (healthCheckInterval) {
clearInterval(healthCheckInterval);
healthCheckInterval = null;
console.error('🛑 Stopped database health monitoring');
}
}
// Query performance monitoring functions
export function recordQueryPerformance(
query: string,
executionTime: number,
rowCount: number,
parameters?: (string | number | null)[]
): void {
if (!APP_CONFIG.enablePerformanceMonitoring) {
return;
}
const metric: QueryPerformanceMetrics = {
query: query.substring(0, 200), // Truncate long queries
execution_time: executionTime,
row_count: rowCount,
cache_hit: false, // We don't have caching yet
timestamp: new Date().toISOString(),
parameters: parameters ? { params: parameters.join(', ') } : undefined, // Log parameters
};
queryMetrics.push(metric);
// Keep only the last N metrics to prevent memory issues
if (queryMetrics.length > maxMetricsHistory) {
queryMetrics.shift();
}
// Log slow queries
if (executionTime > 1000) {
// Queries taking more than 1 second
console.error(
`🐌 Slow query detected: ${executionTime}ms - ${query.substring(0, 100)}...`
);
}
}
export function getQueryPerformanceMetrics(): {
total_queries: number;
average_execution_time: number;
slowest_queries: QueryPerformanceMetrics[];
queries_by_performance: {
fast: number; // < 100ms
medium: number; // 100ms - 1000ms
slow: number; // > 1000ms
};
recent_metrics: QueryPerformanceMetrics[];
} {
const totalQueries = queryMetrics.length;
const averageExecutionTime =
totalQueries > 0
? queryMetrics.reduce((sum, m) => sum + m.execution_time, 0) /
totalQueries
: 0;
const slowestQueries = [...queryMetrics]
.sort((a, b) => b.execution_time - a.execution_time)
.slice(0, 10);
const performanceBreakdown = queryMetrics.reduce(
(acc, metric) => {
if (metric.execution_time < 100) {
acc.fast++;
} else if (metric.execution_time < 1000) {
acc.medium++;
} else {
acc.slow++;
}
return acc;
},
{ fast: 0, medium: 0, slow: 0 }
);
const recentMetrics = queryMetrics.slice(-50); // Last 50 queries
return {
total_queries: totalQueries,
average_execution_time: Math.round(averageExecutionTime * 100) / 100,
slowest_queries: slowestQueries,
queries_by_performance: performanceBreakdown,
recent_metrics: recentMetrics,
};
}
export function clearQueryMetrics(): void {
queryMetrics.length = 0;
console.error('🧹 Cleared query performance metrics');
}
// Database query types based on PostgreSQL schema
export interface Transaction {
id: string;
account_id: string;
user_id: string;
provider_id: string;
external_id?: string;
amount: number;
currency?: string;
local_amount?: number;
local_currency?: string;
exchange_rate?: number;
date: string;
booking_date?: string;
value_date?: string;
processing_date?: string;
description?: string;
original_description?: string;
enhanced_description?: string;
notes?: string;
counterparty_name?: string;
counterparty_iban?: string;
counterparty_bic?: string;
counterparty_account_number?: string;
counterparty_sort_code?: string;
counterparty_type?: string;
merchant_name?: string;
merchant_category?: string;
merchant_category_code?: string;
merchant_location?: string;
merchant_address?: string;
merchant_website?: string;
category?: string;
subcategory?: string;
personal_category?: string;
tax_category?: string;
transaction_type?: string;
transaction_method?: string;
status: string;
transaction_code?: string;
domain_code?: string;
family_code?: string;
subfamily_code?: string;
proprietary_code?: string;
reference?: string;
end_to_end_id?: string;
mandate_id?: string;
creditor_id?: string;
debtor_id?: string;
payment_id?: string;
instruction_id?: string;
check_number?: string;
card_last4?: string;
card_scheme?: string;
card_type?: string;
location_address?: string;
location_city?: string;
location_region?: string;
location_postal_code?: string;
location_country?: string;
location_latitude?: number;
location_longitude?: number;
is_recurring?: boolean;
is_subscription?: boolean;
is_duplicate?: boolean;
is_refund?: boolean;
is_transfer?: boolean;
enrichment_confidence?: number;
running_balance?: number;
tags?: string[];
metadata?: Metadata;
raw_data?: RawData;
created_at: string;
updated_at: string;
}
export interface Account {
id: string;
user_id: string;
provider_id: string;
external_id?: string;
iban?: string;
bic?: string;
account_number?: string;
sort_code?: string;
routing_number?: string;
bsb?: string;
account_type?: string;
account_subtype?: string;
display_name?: string;
product_name?: string;
product_description?: string;
currency?: string;
current_balance?: number;
available_balance?: number;
pending_balance?: number;
cleared_balance?: number;
overdraft_limit?: number;
credit_limit?: number;
amount_due?: number;
minimum_payment?: number;
status: string;
opening_date?: string;
closure_date?: string;
institution_name?: string;
institution_bic?: string;
interest_rate?: number;
interest_type?: string;
accrued_interest?: number;
masked_pan?: string;
card_expiry_date?: string;
metadata?: Metadata;
raw_data?: RawData;
last_synced_at?: string;
last_updated: string;
created_at: string;
updated_at: string;
}
export interface Provider {
id: string;
name: string;
environment: string;
api_version?: string;
capabilities?: string;
last_sync?: string;
sync_status?: string;
status: string;
metadata?: any;
created_at: string;
updated_at: string;
}
export interface User {
id: string;
provider_id?: string;
external_id?: string;
email?: string;
phone?: string;
status: string;
metadata?: any;
created_at: string;
updated_at: string;
}
export interface Balance {
id: string;
account_id: string;
user_id: string;
provider_id: string;
balance_type: string;
amount: number;
currency?: string;
snapshot_date: string;
metadata?: any;
}
// Database query functions
export class DatabaseClient {
private pool: Pool;
constructor() {
this.pool = getDatabase();
}
// Sanitize sensitive data for logging
private sanitizeForLogging(data: unknown): unknown {
if (typeof data !== 'object' || data === null) {
return data;
}
const sensitiveKeys = [
'password',
'token',
'secret',
'key',
'auth',
'credential',
'iban',
'account_number',
'sort_code',
'bic',
];
const sanitized: Record<string, any> = { ...(data as Record<string, any>) };
for (const key in sanitized) {
if (
sensitiveKeys.some(sensitive => key.toLowerCase().includes(sensitive))
) {
sanitized[key] = '[REDACTED]';
} else if (typeof sanitized[key] === 'object') {
sanitized[key] = this.sanitizeForLogging(sanitized[key]);
}
}
return sanitized;
}
// Helper for running all queries with sanitized logging and performance monitoring
private async runAll<T>(
query: string,
params: (string | number | null)[] = []
): Promise<T[]> {
const startTime = Date.now();
const client = await this.pool.connect();
try {
// Log sanitized query for debugging (only in development)
if (APP_CONFIG.enableQueryLogging) {
console.error(
`🔍 Executing query: ${query.substring(0, 100)}${query.length > 100 ? '...' : ''}`
);
console.error(
`📊 Query params: ${JSON.stringify(this.sanitizeForLogging(params))}`
);
}
const result = await client.query(query, params);
const executionTime = Date.now() - startTime;
// Record performance metrics
recordQueryPerformance(
query,
executionTime,
result.rowCount || 0,
params
);
return result.rows as T[];
} catch (error) {
const executionTime = Date.now() - startTime;
recordQueryPerformance(query, executionTime, 0, params);
console.error(
`❌ Query failed: ${query.substring(0, 100)}${query.length > 100 ? '...' : ''}`
);
console.error(`❌ Error:`, error);
throw error;
} finally {
client.release();
}
}
// Helper for running get queries with sanitized logging and performance monitoring
private async runGet<T>(
query: string,
params: (string | number | null)[] = []
): Promise<T | undefined> {
const startTime = Date.now();
const client = await this.pool.connect();
try {
// Log sanitized query for debugging (only in development)
if (APP_CONFIG.enableQueryLogging) {
console.error(
`🔍 Executing query: ${query.substring(0, 100)}${query.length > 100 ? '...' : ''}`
);
console.error(
`📊 Query params: ${JSON.stringify(this.sanitizeForLogging(params))}`
);
}
const result = await client.query(query, params);
const executionTime = Date.now() - startTime;
// Record performance metrics
recordQueryPerformance(
query,
executionTime,
result.rowCount || 0,
params
);
return result.rows[0] as T | undefined;
} catch (error) {
const executionTime = Date.now() - startTime;
recordQueryPerformance(query, executionTime, 0, params);
console.error(
`❌ Query failed: ${query.substring(0, 100)}${query.length > 100 ? '...' : ''}`
);
console.error(`❌ Error:`, error);
throw error;
} finally {
client.release();
}
}
// Get all transactions with optional filtering
async getTransactions(
params: {
limit?: number;
offset?: number;
dateFrom?: string;
dateTo?: string;
category?: string;
accountId?: string;
providerId?: string;
userId?: string;
simplified?: boolean;
} = {}
): Promise<Transaction[]> {
const {
limit = 100,
offset = 0,
dateFrom,
dateTo,
category,
accountId,
providerId,
userId,
simplified = false,
} = params;
let query = `
SELECT t.*, a.display_name as account_name, p.name as provider_name
FROM transactions t
LEFT JOIN accounts a ON t.account_id = a.id
LEFT JOIN providers p ON t.provider_id = p.id
WHERE 1=1
`;
const conditions: string[] = [];
const values: (string | number | null)[] = [];
let paramIndex = 1;
if (dateFrom) {
conditions.push(`t.date >= $${paramIndex++}`);
values.push(dateFrom);
}
if (dateTo) {
conditions.push(`t.date <= $${paramIndex++}`);
values.push(dateTo);
}
if (category) {
conditions.push(`t.category = $${paramIndex++}`);
values.push(category);
}
if (accountId) {
conditions.push(`t.account_id = $${paramIndex++}`);
values.push(accountId);
}
if (providerId) {
conditions.push(`t.provider_id = $${paramIndex++}`);
values.push(providerId);
}
if (userId) {
conditions.push(`t.user_id = $${paramIndex++}`);
values.push(userId);
}
if (conditions.length > 0) {
query += ' AND ' + conditions.join(' AND ');
}
query += ` ORDER BY t.date DESC LIMIT $${paramIndex++} OFFSET $${paramIndex++}`;
values.push(limit, offset);
const transactions = await this.runAll<Transaction>(query, values);
// Simplify transaction data if requested
if (simplified) {
return transactions.map(transaction => ({
...transaction,
description: simplifyTransactionDescription(
transaction.description || ''
),
// Add simplified fields for better user experience
simplified_description: simplifyTransactionDescription(
transaction.description || ''
),
formatted_amount: formatCurrency(
parseFloat(transaction.amount.toString()),
transaction.currency || 'EUR'
),
formatted_date: new Date(transaction.date).toLocaleDateString('en-US', {
year: 'numeric',
month: 'short',
day: 'numeric',
}),
}));
}
return transactions;
}
// Get monthly aggregated data (calculated from transactions)
async getMonthlyData(
params: {
months?: number;
yearMonth?: string;
providerId?: string;
userId?: string;
} = {}
): Promise<
Array<{
year_month: string;
total_income: number;
total_expenses: number;
net_flow: number;
transaction_count: number;
provider_id?: string;
}>
> {
const { months = 12, yearMonth, providerId, userId } = params;
let query = `
SELECT
TO_CHAR(t.date, 'YYYY-MM') as year_month,
t.provider_id,
SUM(CASE WHEN t.amount > 0 THEN t.amount ELSE 0 END)::numeric as total_income,
SUM(CASE WHEN t.amount < 0 THEN ABS(t.amount) ELSE 0 END)::numeric as total_expenses,
SUM(t.amount)::numeric as net_flow,
COUNT(*)::integer as transaction_count
FROM transactions t
WHERE 1=1
`;
const conditions: string[] = [];
const values: (string | number | null)[] = [];
let paramIndex = 1;
if (yearMonth) {
conditions.push(`TO_CHAR(t.date, 'YYYY-MM') = $${paramIndex++}`);
values.push(yearMonth);
} else {
// Get last N months
conditions.push(`t.date >= $${paramIndex++}`);
const cutoffDate = new Date();
cutoffDate.setMonth(cutoffDate.getMonth() - months);
const cutoffMonth = cutoffDate.toISOString().slice(0, 7) + '-01';
values.push(cutoffMonth);
}
if (providerId) {
conditions.push(`t.provider_id = $${paramIndex++}`);
values.push(providerId);
}
if (userId) {
conditions.push(`t.user_id = $${paramIndex++}`);
values.push(userId);
}
if (conditions.length > 0) {
query += ' AND ' + conditions.join(' AND ');
}
query +=
" GROUP BY TO_CHAR(t.date, 'YYYY-MM'), t.provider_id ORDER BY year_month DESC";
return this.runAll<any>(query, values);
}
// Get accounts with balances
async getAccounts(
params: {
providerId?: string;
userId?: string;
accountType?: string;
} = {}
): Promise<Account[]> {
const { providerId, userId, accountType } = params;
let query = `
SELECT a.*, p.name as provider_name
FROM accounts a
LEFT JOIN providers p ON a.provider_id = p.id
WHERE a.status = 'active'
`;
const conditions: string[] = [];
const values: (string | number | null)[] = [];
let paramIndex = 1;
if (providerId) {
conditions.push(`a.provider_id = $${paramIndex++}`);
values.push(providerId);
}
if (userId) {
conditions.push(`a.user_id = $${paramIndex++}`);
values.push(userId);
}
if (accountType) {
conditions.push(`a.account_type = $${paramIndex++}`);
values.push(accountType);
}
if (conditions.length > 0) {
query += ' AND ' + conditions.join(' AND ');
}
query += ' ORDER BY a.current_balance DESC';
return this.runAll<Account>(query, values);
}
// Get account balances
async getAccountBalances(
params: {
providerId?: string;
userId?: string;
} = {}
): Promise<
Array<{
account_id: string;
account_name: string;
provider_id: string;
provider_name: string;
current_balance: number;
available_balance: number;
currency: string;
last_updated: string;
}>
> {
const { providerId, userId } = params;
let query = `
SELECT
a.id as account_id,
a.display_name as account_name,
a.provider_id,
p.name as provider_name,
a.current_balance::numeric as current_balance,
a.available_balance::numeric as available_balance,
a.currency,
a.last_updated
FROM accounts a
LEFT JOIN providers p ON a.provider_id = p.id
WHERE a.status = 'active'
`;
const conditions: string[] = [];
const values: (string | number | null)[] = [];
let paramIndex = 1;
if (providerId) {
conditions.push(`a.provider_id = $${paramIndex++}`);
values.push(providerId);
}
if (userId) {
conditions.push(`a.user_id = $${paramIndex++}`);
values.push(userId);
}
if (conditions.length > 0) {
query += ' AND ' + conditions.join(' AND ');
}
query += ' ORDER BY a.current_balance DESC';
return this.runAll<any>(query, values);
}
// Get spending analysis by category
async getSpendingAnalysis(
params: {
months?: number;
category?: string;
providerId?: string;
userId?: string;
} = {}
): Promise<
Array<{
category: string;
total_amount: number;
transaction_count: number;
average_amount: number;
provider_id: string;
provider_name: string;
}>
> {
const { months = 3, category, providerId, userId } = params;
const cutoffDate = new Date();
cutoffDate.setMonth(cutoffDate.getMonth() - months);
const cutoffDateStr = cutoffDate.toISOString().slice(0, 10);
let query = `
SELECT
t.category,
t.provider_id,
p.name as provider_name,
SUM(ABS(t.amount))::numeric as total_amount,
COUNT(*)::integer as transaction_count,
AVG(ABS(t.amount))::numeric as average_amount
FROM transactions t
LEFT JOIN providers p ON t.provider_id = p.id
WHERE t.date >= $1 AND t.amount < 0 AND t.category IS NOT NULL
`;
const values: any[] = [cutoffDateStr];
let paramIndex = 2;
if (category) {
query += ` AND t.category = $${paramIndex++}`;
values.push(category);
}
if (providerId) {
query += ` AND t.provider_id = $${paramIndex++}`;
values.push(providerId);
}
if (userId) {
query += ` AND t.user_id = $${paramIndex++}`;
values.push(userId);
}
query +=
' GROUP BY t.category, t.provider_id, p.name ORDER BY total_amount DESC';
return this.runAll<any>(query, values);
}
// Get available providers
async getProviders(): Promise<
Array<{
provider_id: string;
provider_name: string;
transaction_count: number;
account_count: number;
}>
> {
const query = `
SELECT
p.id as provider_id,
p.name as provider_name,
COUNT(DISTINCT t.id)::integer as transaction_count,
COUNT(DISTINCT a.id)::integer as account_count
FROM providers p
LEFT JOIN transactions t ON p.id = t.provider_id
LEFT JOIN accounts a ON p.id = a.provider_id
WHERE p.status = 'active'
GROUP BY p.id, p.name
ORDER BY transaction_count DESC
`;
return this.runAll<any>(query);
}
// Get financial overview
async getFinancialOverview(
params: {
providerId?: string;
userId?: string;
} = {}
): Promise<{
total_income: number;
total_expenses: number;
net_cash_flow: number;
transaction_count: number;
account_count: number;
providers: Array<{ id: string; name: string }>;
}> {
const { providerId, userId } = params;
// Get transaction summary
let transactionQuery = `
SELECT
SUM(CASE WHEN amount > 0 THEN amount ELSE 0 END)::numeric as total_income,
SUM(CASE WHEN amount < 0 THEN ABS(amount) ELSE 0 END)::numeric as total_expenses,
SUM(amount)::numeric as net_cash_flow,
COUNT(*)::integer as transaction_count
FROM transactions t
WHERE 1=1
`;
const transactionValues: (string | number | null)[] = [];
let paramIndex = 1;
if (providerId) {
transactionQuery += ` AND t.provider_id = $${paramIndex++}`;
transactionValues.push(providerId);
}
if (userId) {
transactionQuery += ` AND t.user_id = $${paramIndex++}`;
transactionValues.push(userId);
}
// Get account count
let accountQuery = `
SELECT COUNT(*)::integer as account_count
FROM accounts a
WHERE a.status = 'active'
`;
const accountValues: (string | number | null)[] = [];
let accountParamIndex = 1;
if (providerId) {
accountQuery += ` AND a.provider_id = $${accountParamIndex++}`;
accountValues.push(providerId);
}
if (userId) {
accountQuery += ` AND a.user_id = $${accountParamIndex++}`;
accountValues.push(userId);
}
const transactionData = await this.runGet<any>(
transactionQuery,
transactionValues
);
const accountData = await this.runGet<any>(accountQuery, accountValues);
const providers = await this.getProviders();
return {
total_income: transactionData?.total_income || 0,
total_expenses: transactionData?.total_expenses || 0,
net_cash_flow: transactionData?.net_cash_flow || 0,
transaction_count: transactionData?.transaction_count || 0,
account_count: accountData?.account_count || 0,
providers: providers.map(p => ({
id: p.provider_id,
name: p.provider_name,
})),
};
}
}