Skip to main content
Glama
data-quality.ts18.7 kB
import { z } from 'zod'; import { DatabaseConnection } from '../types.js'; import { Logger } from '../utils/logger.js'; import { executeQuery } from '../utils/database.js'; import { escapeIdentifier, sanitizeIdentifier, validateCondition } from '../utils/sanitize.js'; const FindDuplicatesSchema = z.object({ table: z.string(), columns: z.array(z.string()), schema: z.string().optional().default('public'), limit: z.number().optional().default(100), minCount: z.number().optional().default(2), includeRows: z.boolean().optional().default(true) }); const FindMissingValuesSchema = z.object({ table: z.string(), columns: z.array(z.string()), schema: z.string().optional().default('public'), includeRows: z.boolean().optional().default(true), limit: z.number().optional().default(100) }); const FindOrphansSchema = z.object({ table: z.string(), foreignKey: z.string(), referenceTable: z.string(), referenceColumn: z.string(), schema: z.string().optional().default('public'), referenceSchema: z.string().optional().default('public'), limit: z.number().optional().default(100) }); const CheckConstraintViolationsSchema = z.object({ table: z.string(), condition: z.string().describe('SQL boolean expression to check, e.g., "email IS NOT NULL"'), constraintName: z.string().optional().describe('Name for the constraint'), schema: z.string().optional().default('public') }); const AnalyzeTypeConsistencySchema = z.object({ table: z.string(), column: z.string(), schema: z.string().optional().default('public'), suggestConversion: z.boolean().optional().default(true), sampleSize: z.number().optional().default(10000) }); export async function findDuplicates( connection: DatabaseConnection, logger: Logger, args: z.infer<typeof FindDuplicatesSchema> ): Promise<any> { const { table, columns, schema, limit, minCount, includeRows } = args; logger.info('findDuplicates', 'Finding duplicate rows', { table, columns }); const sanitizedSchema = sanitizeIdentifier(schema); const sanitizedTable = sanitizeIdentifier(table); const sanitizedColumns = columns.map(sanitizeIdentifier); const columnList = sanitizedColumns.map(escapeIdentifier).join(', '); const groupByList = sanitizedColumns.map((col, idx) => `${idx + 1}`).join(', '); const countQuery = ` SELECT COUNT(*) as total_rows FROM ${escapeIdentifier(sanitizedSchema)}.${escapeIdentifier(sanitizedTable)} `; const duplicatesQuery = ` SELECT ${columnList}, COUNT(*) as count FROM ${escapeIdentifier(sanitizedSchema)}.${escapeIdentifier(sanitizedTable)} GROUP BY ${columnList} HAVING COUNT(*) >= $1 ORDER BY COUNT(*) DESC LIMIT $2 `; const [totalResult, duplicatesResult] = await Promise.all([ executeQuery(connection, logger, { query: countQuery }), executeQuery(connection, logger, { query: duplicatesQuery, params: [minCount, limit] }) ]); const totalRows = parseInt(totalResult.rows[0]?.total_rows || '0', 10); const duplicateGroups = duplicatesResult.rows; let duplicateGroupsWithRows = duplicateGroups; if (includeRows && duplicateGroups.length > 0) { duplicateGroupsWithRows = await Promise.all( duplicateGroups.map(async (group) => { const whereConditions = sanitizedColumns.map((col, idx) => { return `${escapeIdentifier(col)} = $${idx + 1}`; }).join(' AND '); const rowsQuery = ` SELECT * FROM ${escapeIdentifier(sanitizedSchema)}.${escapeIdentifier(sanitizedTable)} WHERE ${whereConditions} LIMIT 10 `; const params = sanitizedColumns.map(col => group[col]); const rowsResult = await executeQuery(connection, logger, { query: rowsQuery, params }); return { ...group, rows: rowsResult.rows }; }) ); } const totalDuplicateRows = duplicateGroups.reduce( (sum, group) => sum + parseInt(group.count, 10), 0 ); const recommendations: string[] = []; if (duplicateGroups.length > 0) { recommendations.push( `Found ${totalDuplicateRows} duplicate rows across ${duplicateGroups.length} groups` ); recommendations.push( `Consider adding UNIQUE constraint: ALTER TABLE ${schema}.${table} ADD CONSTRAINT ${table}_${columns.join('_')}_unique UNIQUE (${columns.join(', ')})` ); recommendations.push( 'Review and delete duplicates, keeping the most recent or earliest record' ); } else { recommendations.push('✓ No duplicates found'); } return { table, schema, columns, totalDuplicateGroups: duplicateGroups.length, affectedRows: totalDuplicateRows, statistics: { totalRows, uniqueRows: totalRows - totalDuplicateRows, duplicateRows: totalDuplicateRows, duplicatePercentage: totalRows > 0 ? ((totalDuplicateRows / totalRows) * 100).toFixed(2) : '0' }, duplicateGroups: duplicateGroupsWithRows.slice(0, limit), recommendations }; } export async function findMissingValues( connection: DatabaseConnection, logger: Logger, args: z.infer<typeof FindMissingValuesSchema> ): Promise<any> { const { table, columns, schema, includeRows, limit } = args; logger.info('findMissingValues', 'Finding NULL values', { table, columns }); const sanitizedSchema = sanitizeIdentifier(schema); const sanitizedTable = sanitizeIdentifier(table); const sanitizedColumns = columns.map(sanitizeIdentifier); const countQuery = ` SELECT COUNT(*) as total_rows FROM ${escapeIdentifier(sanitizedSchema)}.${escapeIdentifier(sanitizedTable)} `; const totalResult = await executeQuery(connection, logger, { query: countQuery }); const totalRows = parseInt(totalResult.rows[0]?.total_rows || '0', 10); const analysis: Record<string, any> = {}; for (const column of sanitizedColumns) { const nullCountQuery = ` SELECT COUNT(*) FILTER (WHERE ${escapeIdentifier(column)} IS NULL) as null_count FROM ${escapeIdentifier(sanitizedSchema)}.${escapeIdentifier(sanitizedTable)} `; const nullResult = await executeQuery(connection, logger, { query: nullCountQuery }); const nullCount = parseInt(nullResult.rows[0]?.null_count || '0', 10); const nullPercentage = totalRows > 0 ? ((nullCount / totalRows) * 100).toFixed(2) : '0'; let recommendation = ''; let sampleRows = []; if (nullCount === 0) { recommendation = '✓ No NULL values'; } else { const percentage = parseFloat(nullPercentage); if (percentage < 1) { recommendation = `${nullCount} rows with NULL ${column} - minor issue`; } else if (percentage < 5) { recommendation = `⚠ ${percentage}% of rows missing ${column} - investigate`; } else { recommendation = `⚠ ${percentage}% of rows missing ${column} - set default or make required`; } if (includeRows && nullCount > 0) { const sampleQuery = ` SELECT * FROM ${escapeIdentifier(sanitizedSchema)}.${escapeIdentifier(sanitizedTable)} WHERE ${escapeIdentifier(column)} IS NULL LIMIT $1 `; const sampleResult = await executeQuery(connection, logger, { query: sampleQuery, params: [limit] }); sampleRows = sampleResult.rows; } } analysis[column] = { nullCount, nullPercentage: parseFloat(nullPercentage), recommendation, ...(sampleRows.length > 0 && { sampleRows }) }; } const recommendations: string[] = []; for (const [column, data] of Object.entries(analysis)) { if (data.nullCount === 0) { recommendations.push(`Consider adding NOT NULL constraint to ${column}`); } else if (data.nullPercentage > 5) { recommendations.push(`High NULL rate in ${column} (${data.nullPercentage}%) - investigate data quality`); } } return { table, schema, totalRows, analysis, recommendations }; } export async function findOrphans( connection: DatabaseConnection, logger: Logger, args: z.infer<typeof FindOrphansSchema> ): Promise<any> { const { table, foreignKey, referenceTable, referenceColumn, schema, referenceSchema, limit } = args; logger.info('findOrphans', 'Finding orphaned records', { table, foreignKey, referenceTable }); const sanitizedSchema = sanitizeIdentifier(schema); const sanitizedTable = sanitizeIdentifier(table); const sanitizedForeignKey = sanitizeIdentifier(foreignKey); const sanitizedRefSchema = sanitizeIdentifier(referenceSchema); const sanitizedRefTable = sanitizeIdentifier(referenceTable); const sanitizedRefColumn = sanitizeIdentifier(referenceColumn); const orphansQuery = ` SELECT t.* FROM ${escapeIdentifier(sanitizedSchema)}.${escapeIdentifier(sanitizedTable)} t LEFT JOIN ${escapeIdentifier(sanitizedRefSchema)}.${escapeIdentifier(sanitizedRefTable)} r ON t.${escapeIdentifier(sanitizedForeignKey)} = r.${escapeIdentifier(sanitizedRefColumn)} WHERE t.${escapeIdentifier(sanitizedForeignKey)} IS NOT NULL AND r.${escapeIdentifier(sanitizedRefColumn)} IS NULL LIMIT $1 `; const countQuery = ` SELECT COUNT(*) as orphan_count FROM ${escapeIdentifier(sanitizedSchema)}.${escapeIdentifier(sanitizedTable)} t LEFT JOIN ${escapeIdentifier(sanitizedRefSchema)}.${escapeIdentifier(sanitizedRefTable)} r ON t.${escapeIdentifier(sanitizedForeignKey)} = r.${escapeIdentifier(sanitizedRefColumn)} WHERE t.${escapeIdentifier(sanitizedForeignKey)} IS NOT NULL AND r.${escapeIdentifier(sanitizedRefColumn)} IS NULL `; const totalQuery = ` SELECT COUNT(*) as total_count FROM ${escapeIdentifier(sanitizedSchema)}.${escapeIdentifier(sanitizedTable)} `; const [orphansResult, countResult, totalResult] = await Promise.all([ executeQuery(connection, logger, { query: orphansQuery, params: [limit] }), executeQuery(connection, logger, { query: countQuery }), executeQuery(connection, logger, { query: totalQuery }) ]); const orphanCount = parseInt(countResult.rows[0]?.orphan_count || '0', 10); const totalCount = parseInt(totalResult.rows[0]?.total_count || '0', 10); const orphanPercentage = totalCount > 0 ? ((orphanCount / totalCount) * 100).toFixed(2) : '0'; const recommendations: string[] = []; if (orphanCount > 0) { recommendations.push(`Found ${orphanCount} orphaned records (${orphanPercentage}% of total)`); recommendations.push( `Delete orphaned records: DELETE FROM ${schema}.${table} WHERE ${foreignKey} NOT IN (SELECT ${referenceColumn} FROM ${referenceSchema}.${referenceTable})` ); recommendations.push( `Or set to NULL: UPDATE ${schema}.${table} SET ${foreignKey} = NULL WHERE ${foreignKey} NOT IN (SELECT ${referenceColumn} FROM ${referenceSchema}.${referenceTable})` ); recommendations.push( `After cleanup, add FK constraint: ALTER TABLE ${schema}.${table} ADD CONSTRAINT ${table}_${foreignKey}_fkey FOREIGN KEY (${foreignKey}) REFERENCES ${referenceSchema}.${referenceTable}(${referenceColumn})` ); } else { recommendations.push('✓ No orphaned records found'); recommendations.push( `Safe to add FK constraint: ALTER TABLE ${schema}.${table} ADD CONSTRAINT ${table}_${foreignKey}_fkey FOREIGN KEY (${foreignKey}) REFERENCES ${referenceSchema}.${referenceTable}(${referenceColumn})` ); } return { table, schema, foreignKey, referenceTable, referenceSchema, referenceColumn, orphanCount, totalCount, orphanPercentage: parseFloat(orphanPercentage), orphanedRows: orphansResult.rows, recommendations }; } export async function checkConstraintViolations( connection: DatabaseConnection, logger: Logger, args: z.infer<typeof CheckConstraintViolationsSchema> ): Promise<any> { const { table, condition, constraintName, schema } = args; logger.info('checkConstraintViolations', 'Checking constraint violations', { table, condition }); validateCondition(condition); const sanitizedSchema = sanitizeIdentifier(schema); const sanitizedTable = sanitizeIdentifier(table); const name = constraintName || `${table}_check`; const violationsQuery = ` SELECT * FROM ${escapeIdentifier(sanitizedSchema)}.${escapeIdentifier(sanitizedTable)} WHERE NOT (${condition}) LIMIT 100 `; const countQuery = ` SELECT COUNT(*) as violation_count FROM ${escapeIdentifier(sanitizedSchema)}.${escapeIdentifier(sanitizedTable)} WHERE NOT (${condition}) `; const [violationsResult, countResult] = await Promise.all([ executeQuery(connection, logger, { query: violationsQuery }), executeQuery(connection, logger, { query: countQuery }) ]); const violationCount = parseInt(countResult.rows[0]?.violation_count || '0', 10); const recommendations: string[] = []; if (violationCount > 0) { recommendations.push(`⚠ ${violationCount} rows would violate CHECK constraint`); recommendations.push('Fix violations before adding constraint'); recommendations.push(`Example: UPDATE ${schema}.${table} SET ... WHERE NOT (${condition})`); } else { recommendations.push('✓ No violations found - safe to add constraint'); recommendations.push( `ALTER TABLE ${schema}.${table} ADD CONSTRAINT ${name} CHECK (${condition})` ); } return { table, schema, constraint: name, condition, violationCount, violations: violationsResult.rows.slice(0, 20), recommendations }; } export async function analyzeTypeConsistency( connection: DatabaseConnection, logger: Logger, args: z.infer<typeof AnalyzeTypeConsistencySchema> ): Promise<any> { const { table, column, schema, suggestConversion, sampleSize } = args; logger.info('analyzeTypeConsistency', 'Analyzing type consistency', { table, column }); const sanitizedSchema = sanitizeIdentifier(schema); const sanitizedTable = sanitizeIdentifier(table); const sanitizedColumn = sanitizeIdentifier(column); const typeQuery = ` SELECT data_type as current_type FROM information_schema.columns WHERE table_schema = $1 AND table_name = $2 AND column_name = $3 `; const typeResult = await executeQuery(connection, logger, { query: typeQuery, params: [sanitizedSchema, sanitizedTable, sanitizedColumn] }); const currentType = typeResult.rows[0]?.current_type || 'unknown'; const analysisQuery = ` SELECT COUNT(*) as total_rows, COUNT(*) FILTER (WHERE ${escapeIdentifier(sanitizedColumn)} IS NULL) as null_count, COUNT(*) FILTER (WHERE ${escapeIdentifier(sanitizedColumn)} ~ '^[0-9]+$') as integer_count, COUNT(*) FILTER (WHERE ${escapeIdentifier(sanitizedColumn)} ~ '^[0-9]+\\.[0-9]+$') as decimal_count, COUNT(*) FILTER (WHERE ${escapeIdentifier(sanitizedColumn)} ~ '^[0-9]{4}-[0-9]{2}-[0-9]{2}') as date_count, COUNT(*) FILTER (WHERE ${escapeIdentifier(sanitizedColumn)} ~ '^(true|false|t|f|yes|no|y|n|1|0)$') as boolean_count FROM ( SELECT ${escapeIdentifier(sanitizedColumn)} FROM ${escapeIdentifier(sanitizedSchema)}.${escapeIdentifier(sanitizedTable)} LIMIT $1 ) sample `; const analysisResult = await executeQuery(connection, logger, { query: analysisQuery, params: [sampleSize] }); const stats = analysisResult.rows[0]; const totalRows = parseInt(stats.total_rows, 10); const nullCount = parseInt(stats.null_count, 10); const numericCount = parseInt(stats.integer_count, 10) + parseInt(stats.decimal_count, 10); const dateCount = parseInt(stats.date_count, 10); const booleanCount = parseInt(stats.boolean_count, 10); const invalidCount = totalRows - nullCount - numericCount - dateCount - booleanCount; const patterns: Record<string, any> = { numeric: { count: numericCount, percentage: ((numericCount / totalRows) * 100).toFixed(1) }, date: { count: dateCount, percentage: ((dateCount / totalRows) * 100).toFixed(1) }, boolean: { count: booleanCount, percentage: ((booleanCount / totalRows) * 100).toFixed(1) }, null: { count: nullCount, percentage: ((nullCount / totalRows) * 100).toFixed(1) }, invalid: { count: invalidCount, percentage: ((invalidCount / totalRows) * 100).toFixed(1) } }; const recommendations: string[] = []; let suggestedMigration = null; if (numericCount / totalRows > 0.95) { recommendations.push(`✓ ${patterns.numeric.percentage}% of values are numeric`); if (invalidCount > 0) { recommendations.push(`${invalidCount} rows contain non-numeric values - clean up first`); } if (suggestConversion) { const targetType = parseInt(stats.decimal_count, 10) > 0 ? 'numeric(10,2)' : 'integer'; recommendations.push( `Consider converting to ${targetType}: ALTER TABLE ${schema}.${table} ALTER COLUMN ${column} TYPE ${targetType} USING ${column}::${targetType}` ); suggestedMigration = { targetType, needsCleanup: invalidCount > 0, cleanupQuery: `UPDATE ${schema}.${table} SET ${column} = NULL WHERE ${column} !~ '^[0-9.]+$'`, conversionQuery: `ALTER TABLE ${schema}.${table} ALTER COLUMN ${column} TYPE ${targetType} USING ${column}::${targetType}` }; } } else if (dateCount / totalRows > 0.95) { recommendations.push(`✓ ${patterns.date.percentage}% of values are date-like`); recommendations.push(`Consider converting to DATE or TIMESTAMP`); } else if (booleanCount / totalRows > 0.95) { recommendations.push(`✓ ${patterns.boolean.percentage}% of values are boolean-like`); recommendations.push(`Consider converting to BOOLEAN`); } else { recommendations.push('⚠ Data has mixed types - not suitable for type conversion'); } return { table, schema, column, currentType, analysis: { totalRows, sampleSize, patterns }, recommendations, ...(suggestedMigration && { suggestedMigration }) }; } export const dataQualityTools = { findDuplicates: { schema: FindDuplicatesSchema, handler: findDuplicates }, findMissingValues: { schema: FindMissingValuesSchema, handler: findMissingValues }, findOrphans: { schema: FindOrphansSchema, handler: findOrphans }, checkConstraintViolations: { schema: CheckConstraintViolationsSchema, handler: checkConstraintViolations }, analyzeTypeConsistency: { schema: AnalyzeTypeConsistencySchema, handler: analyzeTypeConsistency } };

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/bluwork/postgres-scout-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server