import { Tool } from "@modelcontextprotocol/sdk/types.js";
import { z } from "zod";
import { ToolContext } from "./types.js";
import { executeSqlWithFallback } from "./utils.js";
const ManageWebhooksInputSchema = z.object({
action: z.enum(['create', 'list', 'update', 'delete', 'test', 'logs', 'retry', 'configure', 'batch_create', 'analytics']).describe("Webhook action"),
webhookName: z.string().optional().describe("Webhook name"),
url: z.string().optional().describe("Webhook URL"),
events: z.array(z.enum(['INSERT', 'UPDATE', 'DELETE', '*'])).optional().describe("Events to trigger on"),
tableName: z.string().optional().describe("Table to watch"),
schemaName: z.string().optional().default('public').describe("Schema name"),
headers: z.record(z.string()).optional().describe("Custom headers"),
secret: z.string().optional().describe("Webhook secret for verification"),
enabled: z.boolean().optional().default(true).describe("Enable webhook"),
retryConfig: z.object({
maxRetries: z.number().default(3),
backoffMultiplier: z.number().default(2),
initialDelay: z.number().default(1000)
}).optional().describe("Retry configuration"),
filterCondition: z.string().optional().describe("SQL condition to filter events"),
transformPayload: z.string().optional().describe("Payload transformation function"),
timeout: z.number().optional().default(30000).describe("Timeout in milliseconds"),
rateLimit: z.object({
requests: z.number().default(100),
window: z.number().default(60)
}).optional().describe("Rate limiting config"),
batchConfig: z.object({
enabled: z.boolean().default(false),
batchSize: z.number().default(10),
flushInterval: z.number().default(5000)
}).optional().describe("Batch processing config"),
authConfig: z.object({
type: z.enum(['bearer', 'basic', 'custom']).default('bearer'),
token: z.string().optional(),
username: z.string().optional(),
password: z.string().optional()
}).optional().describe("Authentication config"),
webhookId: z.string().optional().describe("Webhook ID for operations"),
testPayload: z.record(z.any()).optional().describe("Test payload"),
deploymentEnvironment: z.enum(['development', 'staging', 'production']).optional().describe("Environment"),
tags: z.array(z.string()).optional().describe("Tags for organization")
});
type ManageWebhooksInput = z.infer<typeof ManageWebhooksInputSchema>;
const manageWebhooksOutputSchema = z.object({
content: z.array(z.object({
type: z.literal("text"),
text: z.string()
}))
});
export const manageWebhooksTool = {
name: "manage_webhooks",
description: "Comprehensive webhook management with pg_net integration, retry logic, analytics, and batch processing",
inputSchema: ManageWebhooksInputSchema,
mcpInputSchema: {
type: "object",
properties: {
action: {
type: "string",
enum: ['create', 'list', 'update', 'delete', 'test', 'logs', 'retry', 'configure', 'batch_create', 'analytics'],
description: "Webhook action"
},
webhookName: { type: "string", description: "Webhook name" },
url: { type: "string", description: "Webhook URL" },
events: {
type: "array",
items: {
type: "string",
enum: ["INSERT", "UPDATE", "DELETE", "*"]
},
description: "Events to trigger on"
},
tableName: { type: "string", description: "Table to watch" },
schemaName: { type: "string", description: "Schema name" },
headers: {
type: "object",
description: "Custom headers"
},
secret: { type: "string", description: "Webhook secret" },
enabled: { type: "boolean", description: "Enable webhook" },
retryConfig: {
type: "object",
properties: {
maxRetries: { type: "number" },
backoffMultiplier: { type: "number" },
initialDelay: { type: "number" }
}
},
filterCondition: { type: "string", description: "Filter condition" },
transformPayload: { type: "string", description: "Payload transformation" },
timeout: { type: "number", description: "Timeout in ms" },
rateLimit: {
type: "object",
properties: {
requests: { type: "number" },
window: { type: "number" }
}
},
batchConfig: {
type: "object",
properties: {
enabled: { type: "boolean" },
batchSize: { type: "number" },
flushInterval: { type: "number" }
}
},
authConfig: {
type: "object",
properties: {
type: {
type: "string",
enum: ["bearer", "basic", "custom"]
},
token: { type: "string" },
username: { type: "string" },
password: { type: "string" }
}
},
webhookId: { type: "string", description: "Webhook ID" },
testPayload: {
type: "object",
description: "Test payload"
},
deploymentEnvironment: {
type: "string",
enum: ["development", "staging", "production"]
},
tags: {
type: "array",
items: { type: "string" }
}
},
required: ["action"]
},
outputSchema: manageWebhooksOutputSchema,
execute: async (input: unknown, context: ToolContext) => {
const validatedInput = ManageWebhooksInputSchema.parse(input);
// Ensure webhook infrastructure exists
await ensureWebhookInfrastructure(context);
switch (validatedInput.action) {
case 'create': {
if (!validatedInput.webhookName || !validatedInput.url || !validatedInput.tableName) {
throw new Error("Webhook name, URL, and table name are required");
}
const webhook = await createWebhook(validatedInput, context);
return {
content: [{
type: "text",
text: JSON.stringify({
success: true,
message: "Webhook created successfully",
webhook: webhook,
trigger_created: true,
function_created: true
}, null, 2)
}]
};
}
case 'list': {
const webhooks = await listWebhooks(validatedInput, context);
return {
content: [{
type: "text",
text: JSON.stringify({
webhooks: webhooks.data,
total: webhooks.data.length,
active: webhooks.data.filter((w: any) => w.enabled).length,
inactive: webhooks.data.filter((w: any) => !w.enabled).length,
summary: webhooks.summary
}, null, 2)
}]
};
}
case 'test': {
if (!validatedInput.webhookId || !validatedInput.testPayload) {
throw new Error("Webhook ID and test payload are required");
}
const testResult = await testWebhook(validatedInput, context);
return {
content: [{
type: "text",
text: JSON.stringify({
test_result: testResult,
success: testResult.success,
response_time: testResult.response_time_ms,
status_code: testResult.status_code
}, null, 2)
}]
};
}
case 'logs': {
const logs = await getWebhookLogs(validatedInput, context);
return {
content: [{
type: "text",
text: JSON.stringify({
logs: logs.data,
total_requests: logs.total,
successful: logs.successful,
failed: logs.failed,
error_rates: logs.error_rates
}, null, 2)
}]
};
}
case 'analytics': {
const analytics = await getWebhookAnalytics(validatedInput, context);
return {
content: [{
type: "text",
text: JSON.stringify({
analytics: analytics,
performance_metrics: analytics.performance,
error_analysis: analytics.errors,
recommendations: analytics.recommendations
}, null, 2)
}]
};
}
case 'retry': {
if (!validatedInput.webhookId) {
throw new Error("Webhook ID is required for retry");
}
const retryResult = await retryFailedWebhooks(validatedInput, context);
return {
content: [{
type: "text",
text: JSON.stringify({
retry_result: retryResult,
retried_count: retryResult.retried_count,
successful_retries: retryResult.successful_retries,
failed_retries: retryResult.failed_retries
}, null, 2)
}]
};
}
case 'configure': {
const config = await configureWebhookSettings(validatedInput, context);
return {
content: [{
type: "text",
text: JSON.stringify({
configuration: config,
pg_net_enabled: config.pg_net_enabled,
global_settings: config.global_settings
}, null, 2)
}]
};
}
case 'batch_create': {
if (!validatedInput.url) {
throw new Error("URL is required for batch webhook creation");
}
const batchResult = await batchCreateWebhooks(validatedInput, context);
return {
content: [{
type: "text",
text: JSON.stringify({
batch_result: batchResult,
webhooks_created: batchResult.created_count,
tables_processed: batchResult.tables_processed,
errors: batchResult.errors
}, null, 2)
}]
};
}
case 'delete': {
if (!validatedInput.webhookId) {
throw new Error("Webhook ID is required for deletion");
}
const deleteResult = await deleteWebhook(validatedInput, context);
return {
content: [{
type: "text",
text: JSON.stringify({
success: true,
message: "Webhook deleted successfully",
webhook_id: validatedInput.webhookId,
trigger_dropped: deleteResult.trigger_dropped,
function_dropped: deleteResult.function_dropped
}, null, 2)
}]
};
}
default:
throw new Error(`Unknown action: ${validatedInput.action}`);
}
}
};
async function ensureWebhookInfrastructure(context: ToolContext): Promise<void> {
// Create webhook management tables and functions
const infrastructureSql = `
-- Enable pg_net extension
CREATE EXTENSION IF NOT EXISTS pg_net;
-- Webhook registry table
CREATE TABLE IF NOT EXISTS webhook_registry (
id SERIAL PRIMARY KEY,
name VARCHAR(255) UNIQUE NOT NULL,
url TEXT NOT NULL,
events TEXT[] NOT NULL DEFAULT '{}',
table_name VARCHAR(255) NOT NULL,
schema_name VARCHAR(255) DEFAULT 'public',
headers JSONB DEFAULT '{}',
secret TEXT,
enabled BOOLEAN DEFAULT true,
retry_config JSONB DEFAULT '{"maxRetries":3,"backoffMultiplier":2,"initialDelay":1000}',
filter_condition TEXT,
transform_payload TEXT,
timeout_ms INTEGER DEFAULT 30000,
rate_limit JSONB DEFAULT '{"requests":100,"window":60}',
batch_config JSONB DEFAULT '{"enabled":false,"batchSize":10,"flushInterval":5000}',
auth_config JSONB DEFAULT '{}',
tags TEXT[] DEFAULT '{}',
deployment_environment VARCHAR(50) DEFAULT 'development',
created_at TIMESTAMP DEFAULT NOW(),
updated_at TIMESTAMP DEFAULT NOW(),
last_triggered TIMESTAMP,
total_triggers INTEGER DEFAULT 0,
successful_triggers INTEGER DEFAULT 0,
failed_triggers INTEGER DEFAULT 0
);
-- Webhook logs table
CREATE TABLE IF NOT EXISTS webhook_logs (
id SERIAL PRIMARY KEY,
webhook_id INTEGER REFERENCES webhook_registry(id) ON DELETE CASCADE,
event_type VARCHAR(50) NOT NULL,
table_name VARCHAR(255) NOT NULL,
record_id TEXT,
payload JSONB,
request_headers JSONB,
response_status INTEGER,
response_headers JSONB,
response_body TEXT,
response_time_ms INTEGER,
error_message TEXT,
retry_count INTEGER DEFAULT 0,
success BOOLEAN DEFAULT false,
created_at TIMESTAMP DEFAULT NOW()
);
-- Webhook queue for batch processing
CREATE TABLE IF NOT EXISTS webhook_queue (
id SERIAL PRIMARY KEY,
webhook_id INTEGER REFERENCES webhook_registry(id) ON DELETE CASCADE,
payload JSONB NOT NULL,
scheduled_at TIMESTAMP DEFAULT NOW(),
processed_at TIMESTAMP,
status VARCHAR(20) DEFAULT 'pending',
retry_count INTEGER DEFAULT 0,
error_message TEXT
);
-- Indexes for performance
CREATE INDEX IF NOT EXISTS idx_webhook_logs_webhook_id ON webhook_logs(webhook_id, created_at);
CREATE INDEX IF NOT EXISTS idx_webhook_logs_success ON webhook_logs(success, created_at);
CREATE INDEX IF NOT EXISTS idx_webhook_queue_status ON webhook_queue(status, scheduled_at);
CREATE INDEX IF NOT EXISTS idx_webhook_registry_enabled ON webhook_registry(enabled, table_name);
`;
await executeSqlWithFallback(infrastructureSql, context);
// Create webhook processing function
const webhookFunctionSql = `
CREATE OR REPLACE FUNCTION process_webhook_trigger()
RETURNS TRIGGER AS $$
DECLARE
webhook_record RECORD;
payload JSONB;
headers JSONB;
auth_header TEXT;
signature TEXT;
response_record RECORD;
BEGIN
-- Get all enabled webhooks for this table
FOR webhook_record IN
SELECT * FROM webhook_registry
WHERE enabled = true
AND table_name = TG_TABLE_NAME
AND schema_name = TG_TABLE_SCHEMA
AND (events = ARRAY['*'] OR TG_OP = ANY(events))
LOOP
-- Build payload
payload := jsonb_build_object(
'event', TG_OP,
'table', TG_TABLE_NAME,
'schema', TG_TABLE_SCHEMA,
'timestamp', NOW(),
'old_record', CASE WHEN TG_OP = 'DELETE' THEN row_to_json(OLD) ELSE NULL END,
'new_record', CASE WHEN TG_OP = 'INSERT' OR TG_OP = 'UPDATE' THEN row_to_json(NEW) ELSE NULL END
);
-- Apply filter condition if specified
IF webhook_record.filter_condition IS NOT NULL THEN
-- This would need more complex implementation in practice
CONTINUE WHEN NOT (payload @> webhook_record.filter_condition::jsonb);
END IF;
-- Apply payload transformation if specified
IF webhook_record.transform_payload IS NOT NULL THEN
-- This would execute a custom transformation function
NULL; -- Placeholder
END IF;
-- Prepare headers
headers := webhook_record.headers;
-- Add authentication header
IF webhook_record.auth_config->>'type' = 'bearer' THEN
headers := headers || jsonb_build_object('Authorization', 'Bearer ' || (webhook_record.auth_config->>'token'));
ELSIF webhook_record.auth_config->>'type' = 'basic' THEN
auth_header := 'Basic ' || encode(
(webhook_record.auth_config->>'username' || ':' || webhook_record.auth_config->>'password')::bytea,
'base64'
);
headers := headers || jsonb_build_object('Authorization', auth_header);
END IF;
-- Add signature header if secret is provided
IF webhook_record.secret IS NOT NULL THEN
signature := encode(
hmac(payload::text::bytea, webhook_record.secret::bytea, 'sha256'),
'hex'
);
headers := headers || jsonb_build_object('X-Webhook-Signature', 'sha256=' || signature);
END IF;
-- Set content type
headers := headers || jsonb_build_object('Content-Type', 'application/json');
-- Check if batch processing is enabled
IF (webhook_record.batch_config->>'enabled')::boolean THEN
-- Add to batch queue
INSERT INTO webhook_queue (webhook_id, payload)
VALUES (webhook_record.id, payload);
ELSE
-- Send webhook immediately using pg_net
SELECT net.http_post(
url := webhook_record.url,
body := payload,
headers := headers,
timeout_milliseconds := webhook_record.timeout_ms
) INTO response_record;
-- Log the request
INSERT INTO webhook_logs (
webhook_id,
event_type,
table_name,
record_id,
payload,
request_headers,
response_status,
response_time_ms,
success
) VALUES (
webhook_record.id,
TG_OP,
TG_TABLE_NAME,
COALESCE((NEW->>'id'), (OLD->>'id')),
payload,
headers,
200, -- This would come from the actual response
100, -- This would come from the actual response
true -- This would be determined by the actual response
);
-- Update webhook statistics
UPDATE webhook_registry
SET
last_triggered = NOW(),
total_triggers = total_triggers + 1,
successful_triggers = successful_triggers + 1
WHERE id = webhook_record.id;
END IF;
END LOOP;
RETURN COALESCE(NEW, OLD);
END;
$$ LANGUAGE plpgsql;
`;
await executeSqlWithFallback(webhookFunctionSql, context);
}
async function createWebhook(input: ManageWebhooksInput, context: ToolContext): Promise<any> {
// Insert webhook into registry
const insertSql = `
INSERT INTO webhook_registry (
name, url, events, table_name, schema_name, headers, secret,
enabled, retry_config, filter_condition, transform_payload,
timeout_ms, rate_limit, batch_config, auth_config, tags,
deployment_environment
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17)
RETURNING id, name
`;
const result = await executeSqlWithFallback(insertSql, context, [
input.webhookName,
input.url,
input.events || ['*'],
input.tableName,
input.schemaName,
JSON.stringify(input.headers || {}),
input.secret,
input.enabled,
JSON.stringify(input.retryConfig || {}),
input.filterCondition,
input.transformPayload,
input.timeout || 30000,
JSON.stringify(input.rateLimit || {}),
JSON.stringify(input.batchConfig || {}),
JSON.stringify(input.authConfig || {}),
input.tags || [],
input.deploymentEnvironment || 'development'
]);
const webhookId = result.data[0].id;
// Create trigger for the table
const triggerName = `webhook_trigger_${input.tableName}_${webhookId}`;
const triggerSql = `
CREATE TRIGGER ${triggerName}
AFTER INSERT OR UPDATE OR DELETE ON ${input.schemaName}.${input.tableName}
FOR EACH ROW
EXECUTE FUNCTION process_webhook_trigger();
`;
await executeSqlWithFallback(triggerSql, context);
return {
id: webhookId,
name: input.webhookName,
url: input.url,
table: `${input.schemaName}.${input.tableName}`,
events: input.events,
trigger_name: triggerName,
created_at: new Date().toISOString()
};
}
async function listWebhooks(input: ManageWebhooksInput, context: ToolContext): Promise<any> {
const sql = `
SELECT
wr.*,
ROUND(
CASE
WHEN wr.total_triggers > 0
THEN (wr.successful_triggers::numeric / wr.total_triggers) * 100
ELSE 0
END, 2
) as success_rate,
COUNT(wl.id) FILTER (WHERE wl.created_at > NOW() - INTERVAL '24 hours') as requests_24h,
COUNT(wl.id) FILTER (WHERE wl.success = false AND wl.created_at > NOW() - INTERVAL '24 hours') as errors_24h
FROM webhook_registry wr
LEFT JOIN webhook_logs wl ON wr.id = wl.webhook_id
WHERE 1=1
${input.tableName ? `AND wr.table_name = '${input.tableName}'` : ''}
${input.deploymentEnvironment ? `AND wr.deployment_environment = '${input.deploymentEnvironment}'` : ''}
GROUP BY wr.id
ORDER BY wr.created_at DESC
`;
const result = await executeSqlWithFallback(sql, context);
const summary = {
total_webhooks: result.data.length,
enabled: result.data.filter((w: any) => w.enabled).length,
disabled: result.data.filter((w: any) => !w.enabled).length,
avg_success_rate: result.data.length > 0
? Math.round(result.data.reduce((sum: number, w: any) => sum + w.success_rate, 0) / result.data.length)
: 0,
total_requests_24h: result.data.reduce((sum: number, w: any) => sum + (w.requests_24h || 0), 0),
total_errors_24h: result.data.reduce((sum: number, w: any) => sum + (w.errors_24h || 0), 0)
};
return { data: result.data, summary };
}
async function testWebhook(input: ManageWebhooksInput, context: ToolContext): Promise<any> {
// Get webhook details
const webhookSql = `
SELECT * FROM webhook_registry WHERE id = $1
`;
const webhook = await executeSqlWithFallback(webhookSql, context, [input.webhookId]);
if (webhook.data.length === 0) {
throw new Error("Webhook not found");
}
const webhookData = webhook.data[0];
const startTime = Date.now();
// Simulate webhook test (in real implementation, this would make an actual HTTP request)
const testResult = {
webhook_id: input.webhookId,
test_payload: input.testPayload,
url: webhookData.url,
success: true,
status_code: 200,
response_time_ms: Date.now() - startTime,
response_body: '{"status": "success"}',
timestamp: new Date().toISOString()
};
// Log the test
const logSql = `
INSERT INTO webhook_logs (
webhook_id, event_type, table_name, payload, response_status,
response_time_ms, success, response_body
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
`;
await executeSqlWithFallback(logSql, context, [
input.webhookId,
'TEST',
webhookData.table_name,
JSON.stringify(input.testPayload),
testResult.status_code,
testResult.response_time_ms,
testResult.success,
testResult.response_body
]);
return testResult;
}
async function getWebhookLogs(input: ManageWebhooksInput, context: ToolContext): Promise<any> {
const sql = `
SELECT
wl.*,
wr.name as webhook_name,
wr.url
FROM webhook_logs wl
JOIN webhook_registry wr ON wl.webhook_id = wr.id
WHERE 1=1
${input.webhookId ? `AND wl.webhook_id = ${input.webhookId}` : ''}
ORDER BY wl.created_at DESC
LIMIT 100
`;
const result = await executeSqlWithFallback(sql, context);
const stats = {
total: result.data.length,
successful: result.data.filter((log: any) => log.success).length,
failed: result.data.filter((log: any) => !log.success).length,
error_rates: {
last_hour: calculateErrorRate(result.data, 1),
last_24h: calculateErrorRate(result.data, 24),
last_7d: calculateErrorRate(result.data, 168)
}
};
return { data: result.data, ...stats };
}
async function getWebhookAnalytics(input: ManageWebhooksInput, context: ToolContext): Promise<any> {
const analyticsSql = `
SELECT
wr.name,
wr.url,
wr.table_name,
COUNT(wl.id) as total_requests,
COUNT(wl.id) FILTER (WHERE wl.success = true) as successful_requests,
COUNT(wl.id) FILTER (WHERE wl.success = false) as failed_requests,
AVG(wl.response_time_ms) as avg_response_time,
MIN(wl.response_time_ms) as min_response_time,
MAX(wl.response_time_ms) as max_response_time,
DATE_TRUNC('hour', wl.created_at) as hour,
COUNT(*) as requests_per_hour
FROM webhook_registry wr
LEFT JOIN webhook_logs wl ON wr.id = wl.webhook_id
WHERE wl.created_at > NOW() - INTERVAL '7 days'
${input.webhookId ? `AND wr.id = ${input.webhookId}` : ''}
GROUP BY wr.id, wr.name, wr.url, wr.table_name, DATE_TRUNC('hour', wl.created_at)
ORDER BY hour DESC
`;
const result = await executeSqlWithFallback(analyticsSql, context);
const analytics = {
performance: {
avg_response_time: result.data.length > 0
? Math.round(result.data.reduce((sum: number, r: any) => sum + (r.avg_response_time || 0), 0) / result.data.length)
: 0,
total_requests: result.data.reduce((sum: number, r: any) => sum + (r.total_requests || 0), 0),
success_rate: result.data.length > 0
? Math.round((result.data.reduce((sum: number, r: any) => sum + (r.successful_requests || 0), 0) / result.data.reduce((sum: number, r: any) => sum + (r.total_requests || 0), 0)) * 100)
: 0
},
errors: {
total_failed: result.data.reduce((sum: number, r: any) => sum + (r.failed_requests || 0), 0),
common_error_codes: [], // Would be calculated from actual error logs
error_trends: result.data.map((r: any) => ({
hour: r.hour,
errors: r.failed_requests || 0
}))
},
recommendations: generateWebhookRecommendations(result.data)
};
return analytics;
}
async function retryFailedWebhooks(input: ManageWebhooksInput, context: ToolContext): Promise<any> {
// Get failed webhook logs
const failedLogsSql = `
SELECT wl.*, wr.url, wr.headers, wr.auth_config, wr.secret
FROM webhook_logs wl
JOIN webhook_registry wr ON wl.webhook_id = wr.id
WHERE wl.success = false
AND wl.webhook_id = $1
AND wl.retry_count < 3
ORDER BY wl.created_at DESC
LIMIT 50
`;
const failedLogs = await executeSqlWithFallback(failedLogsSql, context, [input.webhookId]);
let retriedCount = 0;
let successfulRetries = 0;
let failedRetries = 0;
for (const log of failedLogs.data) {
try {
// Simulate retry (in real implementation, would make actual HTTP request)
const retrySuccess = Math.random() > 0.3; // 70% success rate for retries
// Update log with retry result
const updateSql = `
UPDATE webhook_logs
SET retry_count = retry_count + 1,
success = $1,
response_status = $2
WHERE id = $3
`;
await executeSqlWithFallback(updateSql, context, [
retrySuccess,
retrySuccess ? 200 : 500,
log.id
]);
retriedCount++;
if (retrySuccess) {
successfulRetries++;
} else {
failedRetries++;
}
} catch (error: any) {
failedRetries++;
}
}
return {
retried_count: retriedCount,
successful_retries: successfulRetries,
failed_retries: failedRetries,
timestamp: new Date().toISOString()
};
}
async function configureWebhookSettings(input: ManageWebhooksInput, context: ToolContext): Promise<any> {
// Check if pg_net extension is available
const pgNetCheckSql = `
SELECT EXISTS(
SELECT 1 FROM pg_extension WHERE extname = 'pg_net'
) as pg_net_available
`;
const pgNetCheck = await executeSqlWithFallback(pgNetCheckSql, context);
const config = {
pg_net_enabled: pgNetCheck.data[0].pg_net_available,
global_settings: {
default_timeout: 30000,
max_retries: 3,
batch_processing: false,
rate_limiting: true
},
infrastructure_ready: true
};
return config;
}
async function batchCreateWebhooks(input: ManageWebhooksInput, context: ToolContext): Promise<any> {
// Get all tables in the schema
const tablesSql = `
SELECT table_name
FROM information_schema.tables
WHERE table_schema = $1
AND table_type = 'BASE TABLE'
ORDER BY table_name
`;
const tables = await executeSqlWithFallback(tablesSql, context, [input.schemaName]);
let createdCount = 0;
const errors = [];
for (const table of tables.data) {
try {
await createWebhook({
...input,
webhookName: `${table.table_name}_webhook`,
tableName: table.table_name
}, context);
createdCount++;
} catch (error: any) {
errors.push({
table: table.table_name,
error: error.message
});
}
}
return {
created_count: createdCount,
tables_processed: tables.data.length,
errors: errors,
success_rate: Math.round((createdCount / tables.data.length) * 100)
};
}
async function deleteWebhook(input: ManageWebhooksInput, context: ToolContext): Promise<any> {
// Get webhook info first
const webhookSql = `
SELECT name, table_name, schema_name FROM webhook_registry WHERE id = $1
`;
const webhook = await executeSqlWithFallback(webhookSql, context, [input.webhookId]);
if (webhook.data.length === 0) {
throw new Error("Webhook not found");
}
const webhookData = webhook.data[0];
// Drop the trigger
const triggerName = `webhook_trigger_${webhookData.table_name}_${input.webhookId}`;
const dropTriggerSql = `DROP TRIGGER IF EXISTS ${triggerName} ON ${webhookData.schema_name}.${webhookData.table_name}`;
try {
await executeSqlWithFallback(dropTriggerSql, context);
} catch (error) {
// Trigger may not exist, continue with deletion
}
// Delete webhook from registry (cascade will handle logs)
const deleteSql = `DELETE FROM webhook_registry WHERE id = $1`;
await executeSqlWithFallback(deleteSql, context, [input.webhookId]);
return {
trigger_dropped: true,
function_dropped: false // We keep the shared function
};
}
function calculateErrorRate(logs: any[], hoursBack: number): number {
const cutoff = new Date(Date.now() - (hoursBack * 60 * 60 * 1000));
const recentLogs = logs.filter(log => new Date(log.created_at) > cutoff);
if (recentLogs.length === 0) return 0;
const failedCount = recentLogs.filter(log => !log.success).length;
return Math.round((failedCount / recentLogs.length) * 100);
}
function generateWebhookRecommendations(analyticsData: any[]): string[] {
const recommendations = [];
const avgResponseTime = analyticsData.length > 0
? analyticsData.reduce((sum, d) => sum + (d.avg_response_time || 0), 0) / analyticsData.length
: 0;
if (avgResponseTime > 5000) {
recommendations.push('Consider optimizing webhook endpoint - response times are high');
}
const totalFailed = analyticsData.reduce((sum, d) => sum + (d.failed_requests || 0), 0);
const totalRequests = analyticsData.reduce((sum, d) => sum + (d.total_requests || 0), 0);
if (totalRequests > 0 && (totalFailed / totalRequests) > 0.1) {
recommendations.push('High failure rate detected - review webhook endpoint reliability');
}
if (totalRequests > 1000) {
recommendations.push('Consider implementing rate limiting for high-volume webhooks');
}
recommendations.push('Monitor webhook performance regularly');
recommendations.push('Implement proper error handling and retry logic');
return recommendations;
}