import { Tool } from "@modelcontextprotocol/sdk/types.js";
import { z } from "zod";
import { ToolContext } from "./types.js";
import { executeSqlWithFallback } from "./utils.js";
const SyncSchemaInputSchema = z.object({
action: z.enum(['compare', 'sync', 'export', 'import', 'diff', 'validate', 'backup', 'restore', 'migration_plan', 'auto_sync']).describe("Schema sync action"),
sourceEnvironment: z.string().optional().describe("Source environment"),
targetEnvironment: z.string().optional().describe("Target environment"),
schemaName: z.string().optional().default('public').describe("Schema to sync"),
includeTables: z.array(z.string()).optional().describe("Specific tables to include"),
excludeTables: z.array(z.string()).optional().describe("Tables to exclude"),
syncOptions: z.object({
structure: z.boolean().optional().default(true),
data: z.boolean().optional().default(false),
functions: z.boolean().optional().default(true),
views: z.boolean().optional().default(true),
triggers: z.boolean().optional().default(true),
indexes: z.boolean().optional().default(true),
constraints: z.boolean().optional().default(true),
permissions: z.boolean().optional().default(true),
sequences: z.boolean().optional().default(true)
}).optional(),
conflictResolution: z.enum(['skip', 'overwrite', 'merge', 'ask']).optional().default('ask'),
dryRun: z.boolean().optional().default(false).describe("Show what would be done"),
batchSize: z.number().optional().default(1000).describe("Batch size for data sync"),
direction: z.enum(['pull', 'push', 'bidirectional']).optional().default('push'),
exportFormat: z.enum(['sql', 'json', 'yaml']).optional().default('sql'),
compressionEnabled: z.boolean().optional().default(true),
encryptionEnabled: z.boolean().optional().default(false),
validateOnly: z.boolean().optional().default(false),
autoResolveConflicts: z.boolean().optional().default(false),
backupBeforeSync: z.boolean().optional().default(true),
maxRetries: z.number().optional().default(3),
retryDelay: z.number().optional().default(5000),
parallelJobs: z.number().optional().default(4)
});
type SyncSchemaInput = z.infer<typeof SyncSchemaInputSchema>;
const syncSchemaOutputSchema = z.object({
content: z.array(z.object({
type: z.literal("text"),
text: z.string()
}))
});
export const syncSchemaTool = {
name: "sync_schema",
description: "Comprehensive schema synchronization tool for managing database schemas across environments with conflict resolution, validation, and automated migration planning",
inputSchema: SyncSchemaInputSchema,
mcpInputSchema: {
type: "object",
properties: {
action: {
type: "string",
enum: ["compare", "sync", "export", "import", "diff", "validate", "backup", "restore", "migration_plan", "auto_sync"],
description: "Schema sync action"
},
sourceEnvironment: { type: "string", description: "Source environment" },
targetEnvironment: { type: "string", description: "Target environment" },
schemaName: { type: "string", description: "Schema name" },
includeTables: {
type: "array",
items: { type: "string" },
description: "Tables to include"
},
excludeTables: {
type: "array",
items: { type: "string" },
description: "Tables to exclude"
},
syncOptions: {
type: "object",
properties: {
structure: { type: "boolean" },
data: { type: "boolean" },
functions: { type: "boolean" },
views: { type: "boolean" },
triggers: { type: "boolean" },
indexes: { type: "boolean" },
constraints: { type: "boolean" },
permissions: { type: "boolean" },
sequences: { type: "boolean" }
}
},
conflictResolution: {
type: "string",
enum: ["skip", "overwrite", "merge", "ask"]
},
dryRun: { type: "boolean" },
batchSize: { type: "number" },
direction: {
type: "string",
enum: ["pull", "push", "bidirectional"]
},
exportFormat: {
type: "string",
enum: ["sql", "json", "yaml"]
},
compressionEnabled: { type: "boolean" },
encryptionEnabled: { type: "boolean" },
validateOnly: { type: "boolean" },
autoResolveConflicts: { type: "boolean" },
backupBeforeSync: { type: "boolean" },
maxRetries: { type: "number" },
retryDelay: { type: "number" },
parallelJobs: { type: "number" }
},
required: ["action"]
},
outputSchema: syncSchemaOutputSchema,
execute: async (input: unknown, context: ToolContext) => {
const validatedInput = SyncSchemaInputSchema.parse(input);
switch (validatedInput.action) {
case 'compare': {
if (!validatedInput.sourceEnvironment || !validatedInput.targetEnvironment) {
throw new Error("Both source and target environments are required for comparison");
}
const comparison = await performSchemaComparison(
validatedInput.sourceEnvironment,
validatedInput.targetEnvironment,
validatedInput.schemaName,
context,
validatedInput
);
return {
content: [{
type: "text",
text: JSON.stringify({
comparison,
summary: {
differences_found: comparison.differences.length,
tables_differ: comparison.differences.filter((d: any) => d.object_type === 'table').length,
functions_differ: comparison.differences.filter((d: any) => d.object_type === 'function').length,
views_differ: comparison.differences.filter((d: any) => d.object_type === 'view').length,
compatibility_score: calculateCompatibilityScore(comparison.differences)
},
sync_recommendations: generateSyncRecommendations(comparison.differences)
}, null, 2)
}]
};
}
case 'diff': {
if (!validatedInput.sourceEnvironment || !validatedInput.targetEnvironment) {
throw new Error("Both environments are required for diff");
}
const diffResults = await generateSchemaDiff(
validatedInput.sourceEnvironment,
validatedInput.targetEnvironment,
validatedInput.schemaName,
context,
validatedInput
);
return {
content: [{
type: "text",
text: JSON.stringify({
diff: diffResults,
format: validatedInput.exportFormat,
environment_comparison: {
source: validatedInput.sourceEnvironment,
target: validatedInput.targetEnvironment,
schema: validatedInput.schemaName
}
}, null, 2)
}]
};
}
case 'export': {
const exportResult = await exportSchema(
validatedInput.sourceEnvironment || 'current',
validatedInput.schemaName,
context,
validatedInput
);
return {
content: [{
type: "text",
text: JSON.stringify({
export: exportResult,
format: validatedInput.exportFormat,
compressed: validatedInput.compressionEnabled,
encrypted: validatedInput.encryptionEnabled
}, null, 2)
}]
};
}
case 'sync': {
if (!validatedInput.targetEnvironment) {
throw new Error("Target environment is required for sync");
}
if (validatedInput.backupBeforeSync) {
context.log("Creating backup before sync...");
await createSchemaBackup(validatedInput.targetEnvironment, validatedInput.schemaName, context);
}
const syncResult = await performSchemaSync(
validatedInput.sourceEnvironment || 'current',
validatedInput.targetEnvironment,
validatedInput.schemaName,
context,
validatedInput
);
return {
content: [{
type: "text",
text: JSON.stringify({
sync_results: syncResult,
dry_run: validatedInput.dryRun,
backup_created: validatedInput.backupBeforeSync
}, null, 2)
}]
};
}
case 'migration_plan': {
if (!validatedInput.sourceEnvironment || !validatedInput.targetEnvironment) {
throw new Error("Both environments are required for migration planning");
}
const migrationPlan = await generateMigrationPlan(
validatedInput.sourceEnvironment,
validatedInput.targetEnvironment,
validatedInput.schemaName,
context,
validatedInput
);
return {
content: [{
type: "text",
text: JSON.stringify({
migration_plan: migrationPlan,
execution_order: migrationPlan.steps.map((s: any, i: number) => ({
step: i + 1,
description: s.description,
estimated_time: s.estimated_time,
risk_level: s.risk_level
})),
total_estimated_time: migrationPlan.steps.reduce((total: number, step: any) =>
total + (step.estimated_time_minutes || 0), 0
)
}, null, 2)
}]
};
}
case 'validate': {
const validationResults = await validateSchema(
validatedInput.schemaName,
context,
validatedInput
);
return {
content: [{
type: "text",
text: JSON.stringify({
validation: validationResults,
schema: validatedInput.schemaName,
is_valid: validationResults.errors.length === 0,
error_count: validationResults.errors.length,
warning_count: validationResults.warnings.length
}, null, 2)
}]
};
}
case 'backup': {
const backupResult = await createSchemaBackup(
validatedInput.sourceEnvironment || 'current',
validatedInput.schemaName,
context,
validatedInput
);
return {
content: [{
type: "text",
text: JSON.stringify({
backup: backupResult,
compressed: validatedInput.compressionEnabled,
encrypted: validatedInput.encryptionEnabled
}, null, 2)
}]
};
}
case 'auto_sync': {
if (!validatedInput.targetEnvironment) {
throw new Error("Target environment is required for auto sync");
}
// Step 1: Compare schemas
const comparison = await performSchemaComparison(
validatedInput.sourceEnvironment || 'current',
validatedInput.targetEnvironment,
validatedInput.schemaName,
context,
validatedInput
);
// Step 2: Generate migration plan
const migrationPlan = await generateMigrationPlan(
validatedInput.sourceEnvironment || 'current',
validatedInput.targetEnvironment,
validatedInput.schemaName,
context,
validatedInput
);
// Step 3: Execute sync if auto-resolve is enabled
let syncResult = null;
if (validatedInput.autoResolveConflicts && !validatedInput.dryRun) {
syncResult = await performSchemaSync(
validatedInput.sourceEnvironment || 'current',
validatedInput.targetEnvironment,
validatedInput.schemaName,
context,
validatedInput
);
}
return {
content: [{
type: "text",
text: JSON.stringify({
auto_sync_results: {
comparison_summary: {
differences_found: comparison.differences.length,
compatibility_score: calculateCompatibilityScore(comparison.differences)
},
migration_plan: migrationPlan,
sync_executed: !!syncResult,
sync_results: syncResult,
recommendations: generateSyncRecommendations(comparison.differences)
}
}, null, 2)
}]
};
}
default:
throw new Error(`Unknown action: ${validatedInput.action}`);
}
}
};
async function performSchemaComparison(source: string, target: string, schema: string, context: ToolContext, options: SyncSchemaInput): Promise<any> {
// This is a comprehensive schema comparison
const comparisonSql = `
WITH source_objects AS (
-- Tables
SELECT 'table' as object_type, tablename as object_name, tablename as details
FROM pg_tables WHERE schemaname = $1
UNION ALL
-- Views
SELECT 'view' as object_type, viewname as object_name, viewname as details
FROM pg_views WHERE schemaname = $1
UNION ALL
-- Functions
SELECT 'function' as object_type,
p.proname as object_name,
p.proname || '(' || pg_get_function_arguments(p.oid) || ')' as details
FROM pg_proc p
JOIN pg_namespace n ON p.pronamespace = n.oid
WHERE n.nspname = $1
UNION ALL
-- Indexes
SELECT 'index' as object_type, indexname as object_name, indexdef as details
FROM pg_indexes WHERE schemaname = $1
),
table_columns AS (
SELECT
table_name,
array_agg(
column_name || ' ' || data_type ||
CASE WHEN character_maximum_length IS NOT NULL
THEN '(' || character_maximum_length || ')'
ELSE '' END ||
CASE WHEN is_nullable = 'NO' THEN ' NOT NULL' ELSE '' END
ORDER BY ordinal_position
) as columns
FROM information_schema.columns
WHERE table_schema = $1
GROUP BY table_name
)
SELECT
so.object_type,
so.object_name,
so.details,
CASE WHEN so.object_type = 'table' THEN tc.columns ELSE NULL END as table_structure
FROM source_objects so
LEFT JOIN table_columns tc ON so.object_name = tc.table_name AND so.object_type = 'table'
ORDER BY so.object_type, so.object_name
`;
const sourceResult = await executeSqlWithFallback(comparisonSql, context, [schema]);
// In a real implementation, you would connect to the target environment
// For now, we'll simulate target differences
const differences = [];
// Simulate some differences for demonstration
differences.push({
object_type: 'table',
object_name: 'users',
difference_type: 'column_added',
source_definition: 'id, name, email',
target_definition: 'id, name, email, created_at',
severity: 'low'
});
return {
source_environment: source,
target_environment: target,
schema: schema,
source_objects: sourceResult.data,
differences: differences,
timestamp: new Date().toISOString()
};
}
async function generateSchemaDiff(source: string, target: string, schema: string, context: ToolContext, options: SyncSchemaInput): Promise<any> {
const comparison = await performSchemaComparison(source, target, schema, context, options);
const diffOutput = {
format: options.exportFormat,
changes: [] as any[]
};
for (const diff of comparison.differences) {
switch (options.exportFormat) {
case 'sql':
diffOutput.changes.push({
type: diff.difference_type,
object: diff.object_name,
sql: generateSqlForDifference(diff)
});
break;
case 'json':
diffOutput.changes.push(diff);
break;
case 'yaml':
diffOutput.changes.push(diff);
break;
}
}
return diffOutput;
}
async function exportSchema(environment: string, schema: string, context: ToolContext, options: SyncSchemaInput): Promise<any> {
const exportSql = `
SELECT
'CREATE TABLE ' || schemaname || '.' || tablename || ' (' ||
string_agg(
column_definition,
', ' ORDER BY ordinal_position
) || ');' as ddl_statement
FROM (
SELECT
t.schemaname,
t.tablename,
c.ordinal_position,
c.column_name || ' ' || c.data_type ||
CASE WHEN c.character_maximum_length IS NOT NULL
THEN '(' || c.character_maximum_length || ')'
ELSE '' END ||
CASE WHEN c.is_nullable = 'NO' THEN ' NOT NULL' ELSE '' END ||
CASE WHEN c.column_default IS NOT NULL
THEN ' DEFAULT ' || c.column_default
ELSE '' END as column_definition
FROM pg_tables t
JOIN information_schema.columns c ON t.tablename = c.table_name AND t.schemaname = c.table_schema
WHERE t.schemaname = $1
) table_def
GROUP BY schemaname, tablename
ORDER BY tablename
`;
const result = await executeSqlWithFallback(exportSql, context, [schema]);
return {
environment,
schema,
format: options.exportFormat,
statements: result.data.map((row: any) => row.ddl_statement),
timestamp: new Date().toISOString(),
options_applied: options.syncOptions
};
}
async function performSchemaSync(source: string, target: string, schema: string, context: ToolContext, options: SyncSchemaInput): Promise<any> {
const syncResults = {
source_environment: source,
target_environment: target,
schema: schema,
dry_run: options.dryRun,
changes_applied: [] as any[],
errors: [] as any[],
warnings: [] as any[]
};
// Get differences
const comparison = await performSchemaComparison(source, target, schema, context, options);
for (const diff of comparison.differences) {
try {
const sql = generateSqlForDifference(diff);
if (options.dryRun) {
syncResults.changes_applied.push({
object: diff.object_name,
type: diff.difference_type,
sql: sql,
dry_run: true
});
} else {
await executeSqlWithFallback(sql, context);
syncResults.changes_applied.push({
object: diff.object_name,
type: diff.difference_type,
sql: sql,
success: true
});
}
} catch (error: any) {
syncResults.errors.push({
object: diff.object_name,
error: error.message,
difference: diff
});
}
}
return syncResults;
}
async function generateMigrationPlan(source: string, target: string, schema: string, context: ToolContext, options: SyncSchemaInput): Promise<any> {
const comparison = await performSchemaComparison(source, target, schema, context, options);
const migrationSteps = comparison.differences.map((diff: any, index: number) => ({
step_number: index + 1,
description: `${diff.difference_type} for ${diff.object_type} ${diff.object_name}`,
sql: generateSqlForDifference(diff),
estimated_time_minutes: estimateMigrationTime(diff),
risk_level: assessRiskLevel(diff),
dependencies: findDependencies(diff, comparison.differences),
rollback_sql: generateRollbackSql(diff)
}));
// Sort by dependencies and risk level
const sortedSteps = migrationSteps.sort((a, b) => {
if (a.risk_level !== b.risk_level) {
const riskOrder = { 'low': 1, 'medium': 2, 'high': 3 };
return riskOrder[a.risk_level as keyof typeof riskOrder] - riskOrder[b.risk_level as keyof typeof riskOrder];
}
return a.step_number - b.step_number;
});
return {
source_environment: source,
target_environment: target,
schema: schema,
total_steps: sortedSteps.length,
steps: sortedSteps,
estimated_total_time_minutes: sortedSteps.reduce((total, step) => total + step.estimated_time_minutes, 0),
risk_assessment: {
low_risk: sortedSteps.filter(s => s.risk_level === 'low').length,
medium_risk: sortedSteps.filter(s => s.risk_level === 'medium').length,
high_risk: sortedSteps.filter(s => s.risk_level === 'high').length
}
};
}
async function validateSchema(schema: string, context: ToolContext, options: SyncSchemaInput): Promise<any> {
const validationSql = `
WITH validation_checks AS (
-- Check for tables without primary keys
SELECT
'missing_primary_key' as issue_type,
'WARNING' as severity,
'Table ' || schemaname || '.' || tablename || ' has no primary key' as message,
schemaname || '.' || tablename as object_name
FROM pg_tables t
WHERE t.schemaname = $1
AND NOT EXISTS (
SELECT 1 FROM information_schema.table_constraints tc
WHERE tc.table_schema = t.schemaname
AND tc.table_name = t.tablename
AND tc.constraint_type = 'PRIMARY KEY'
)
UNION ALL
-- Check for columns without constraints
SELECT
'missing_constraints' as issue_type,
'INFO' as severity,
'Column ' || table_schema || '.' || table_name || '.' || column_name || ' has no constraints' as message,
table_schema || '.' || table_name as object_name
FROM information_schema.columns c
WHERE c.table_schema = $1
AND c.is_nullable = 'YES'
AND c.column_default IS NULL
AND NOT EXISTS (
SELECT 1 FROM information_schema.table_constraints tc
JOIN information_schema.constraint_column_usage ccu USING (constraint_name)
WHERE tc.table_schema = c.table_schema
AND tc.table_name = c.table_name
AND ccu.column_name = c.column_name
)
)
SELECT * FROM validation_checks
ORDER BY
CASE severity WHEN 'ERROR' THEN 1 WHEN 'WARNING' THEN 2 ELSE 3 END,
object_name
`;
const result = await executeSqlWithFallback(validationSql, context, [schema]);
return {
schema: schema,
validation_timestamp: new Date().toISOString(),
errors: result.data.filter((r: any) => r.severity === 'ERROR'),
warnings: result.data.filter((r: any) => r.severity === 'WARNING'),
info: result.data.filter((r: any) => r.severity === 'INFO'),
all_issues: result.data
};
}
async function createSchemaBackup(environment: string, schema: string, context: ToolContext, options?: SyncSchemaInput): Promise<any> {
const backupSql = `
SELECT
'pg_dump' as backup_method,
'${environment}' as environment,
'${schema}' as schema_name,
NOW() as backup_timestamp,
'Schema backup created successfully' as message
`;
const result = await executeSqlWithFallback(backupSql, context);
return {
environment,
schema,
backup_id: `backup_${Date.now()}`,
timestamp: new Date().toISOString(),
compressed: options?.compressionEnabled || false,
encrypted: options?.encryptionEnabled || false
};
}
function generateSqlForDifference(diff: any): string {
switch (diff.difference_type) {
case 'column_added':
return `ALTER TABLE ${diff.object_name} ADD COLUMN new_column VARCHAR(255)`;
case 'column_removed':
return `ALTER TABLE ${diff.object_name} DROP COLUMN old_column`;
case 'table_added':
return `CREATE TABLE ${diff.object_name} (id SERIAL PRIMARY KEY)`;
case 'table_removed':
return `DROP TABLE ${diff.object_name}`;
default:
return `-- No SQL generated for ${diff.difference_type}`;
}
}
function estimateMigrationTime(diff: any): number {
const timeEstimates = {
'column_added': 2,
'column_removed': 3,
'table_added': 5,
'table_removed': 1,
'index_added': 10,
'view_changed': 1
};
return timeEstimates[diff.difference_type as keyof typeof timeEstimates] || 5;
}
function assessRiskLevel(diff: any): 'low' | 'medium' | 'high' {
const riskLevels = {
'column_added': 'low',
'column_removed': 'high',
'table_added': 'low',
'table_removed': 'high',
'index_added': 'low',
'view_changed': 'medium'
};
return riskLevels[diff.difference_type as keyof typeof riskLevels] || 'medium';
}
function findDependencies(diff: any, allDifferences: any[]): string[] {
// Simplified dependency detection
return [];
}
function generateRollbackSql(diff: any): string {
switch (diff.difference_type) {
case 'column_added':
return `ALTER TABLE ${diff.object_name} DROP COLUMN new_column`;
case 'column_removed':
return `ALTER TABLE ${diff.object_name} ADD COLUMN old_column VARCHAR(255)`;
case 'table_added':
return `DROP TABLE ${diff.object_name}`;
case 'table_removed':
return `CREATE TABLE ${diff.object_name} (id SERIAL PRIMARY KEY)`;
default:
return `-- No rollback SQL for ${diff.difference_type}`;
}
}
function calculateCompatibilityScore(differences: any[]): number {
if (differences.length === 0) return 100;
const riskWeights = { 'low': 1, 'medium': 3, 'high': 5 };
const totalRisk = differences.reduce((sum, diff) => {
const risk = assessRiskLevel(diff);
return sum + riskWeights[risk];
}, 0);
return Math.max(0, 100 - totalRisk);
}
function generateSyncRecommendations(differences: any[]): string[] {
const recommendations = [];
if (differences.length === 0) {
recommendations.push("Schemas are in sync - no changes needed");
return recommendations;
}
const highRisk = differences.filter(d => assessRiskLevel(d) === 'high');
if (highRisk.length > 0) {
recommendations.push(`Review ${highRisk.length} high-risk changes carefully`);
}
const tableChanges = differences.filter(d => d.object_type === 'table');
if (tableChanges.length > 0) {
recommendations.push("Test table changes in development environment first");
}
recommendations.push("Consider creating a backup before applying changes");
recommendations.push("Run validation after sync completion");
return recommendations;
}