Skip to main content
Glama

Superglue MCP

Official
by superglue-ai
postgres.ts43.1 kB
import type { ApiConfig, ExtractConfig, Integration, RunResult, TransformConfig, Workflow } from "@superglue/client"; import { Pool, PoolConfig } from 'pg'; import { credentialEncryption } from "../utils/encryption.js"; import { logMessage } from "../utils/logs.js"; import type { DataStore, WorkflowScheduleInternal } from "./types.js"; type ConfigType = 'api' | 'extract' | 'transform' | 'workflow'; type ConfigData = ApiConfig | ExtractConfig | TransformConfig | Workflow; export class PostgresService implements DataStore { private pool: Pool; constructor(config: PoolConfig) { this.pool = new Pool({ ...config, max: 20, min: 2, ssl: config.ssl === false ? false : { rejectUnauthorized: false } }); this.pool.on('error', (err) => { console.error('postgres pool error:', err); }); this.pool.on('connect', () => { logMessage('debug', '🐘 postgres connected'); }); this.initializeTables(); } async getManyWorkflows(params: { ids: string[]; orgId?: string }): Promise<Workflow[]> { const { ids, orgId } = params; const client = await this.pool.connect(); try { const result = await client.query( 'SELECT id, data FROM configurations WHERE id = ANY($1) AND type = $2 AND org_id = $3', [ids, 'workflow', orgId || ''] ); return result.rows.map(row => ({ ...row.data, id: row.id })); } finally { client.release(); } } async getManyIntegrations(params: { ids: string[]; includeDocs?: boolean; orgId?: string }): Promise<Integration[]> { const { ids, includeDocs = true, orgId } = params; const client = await this.pool.connect(); try { let query; if (includeDocs) { query = `SELECT i.id, i.name, i.type, i.url_host, i.url_path, i.credentials, i.documentation_url, i.documentation_pending, i.open_api_url, i.specific_instructions, i.documentation_keywords, i.icon, i.version, i.created_at, i.updated_at, d.documentation, d.open_api_schema FROM integrations i LEFT JOIN integration_details d ON i.id = d.integration_id AND i.org_id = d.org_id WHERE i.id = ANY($1) AND i.org_id = $2`; } else { query = `SELECT id, name, type, url_host, url_path, credentials, documentation_url, documentation_pending, open_api_url, specific_instructions, documentation_keywords, icon, version, created_at, updated_at FROM integrations WHERE id = ANY($1) AND org_id = $2`; } const result = await client.query(query, [ids, orgId || '']); return result.rows.map((row: any) => { const integration: Integration = { id: row.id, name: row.name, type: row.type, urlHost: row.url_host, urlPath: row.url_path, credentials: row.credentials ? credentialEncryption.decrypt(row.credentials) : {}, documentationUrl: row.documentation_url, documentation: includeDocs ? row.documentation : undefined, documentationPending: row.documentation_pending, openApiUrl: row.open_api_url, openApiSchema: includeDocs ? row.open_api_schema : undefined, specificInstructions: row.specific_instructions, documentationKeywords: row.documentation_keywords, icon: row.icon, version: row.version, createdAt: row.created_at, updatedAt: row.updated_at }; return integration; }); } finally { client.release(); } } private async initializeTables(): Promise<void> { const client = await this.pool.connect(); try { // Unified configurations table (merged configs + workflows) await client.query(` CREATE TABLE IF NOT EXISTS configurations ( id VARCHAR(255) NOT NULL, org_id VARCHAR(255), type VARCHAR(20) NOT NULL CHECK (type IN ('api', 'extract', 'transform', 'workflow')), version VARCHAR(50), data JSONB NOT NULL, integration_ids VARCHAR(255)[] DEFAULT '{}', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (id, type, org_id) ) `); // Runs table await client.query(` CREATE TABLE IF NOT EXISTS runs ( id VARCHAR(255) NOT NULL, config_id VARCHAR(255), org_id VARCHAR(255), data JSONB NOT NULL, started_at TIMESTAMP, completed_at TIMESTAMP, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (id, org_id) ) `); // Integrations table (without large fields) await client.query(` CREATE TABLE IF NOT EXISTS integrations ( id VARCHAR(255) NOT NULL, org_id VARCHAR(255), name VARCHAR(255), type VARCHAR(100), url_host VARCHAR(500), url_path VARCHAR(500), credentials JSONB, -- Encrypted JSON object documentation_url VARCHAR(1000), documentation_pending BOOLEAN DEFAULT FALSE, open_api_url VARCHAR(1000), specific_instructions TEXT, documentation_keywords TEXT[], icon VARCHAR(255), version VARCHAR(50), created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (id, org_id) ) `); // New table for large integration fields await client.query(` CREATE TABLE IF NOT EXISTS integration_details ( integration_id VARCHAR(255) NOT NULL, org_id VARCHAR(255), documentation TEXT, open_api_schema TEXT, PRIMARY KEY (integration_id, org_id), FOREIGN KEY (integration_id, org_id) REFERENCES integrations(id, org_id) ON DELETE CASCADE ) `); // Integration templates table for Superglue OAuth credentials (and potentially further fields in the future) await client.query(` CREATE TABLE IF NOT EXISTS integration_templates ( id VARCHAR(255) PRIMARY KEY, sg_client_id VARCHAR(500), sg_client_secret TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) `); // Tenant info table await client.query(` CREATE TABLE IF NOT EXISTS tenant_info ( id VARCHAR(10) DEFAULT 'default', email VARCHAR(255), email_entry_skipped BOOLEAN DEFAULT FALSE, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (id) ) `); await client.query(` CREATE TABLE IF NOT EXISTS workflow_schedules ( id UUID NOT NULL, org_id TEXT NOT NULL, workflow_id TEXT NOT NULL, workflow_type TEXT NOT NULL, cron_expression TEXT NOT NULL, timezone TEXT NOT NULL, enabled BOOLEAN NOT NULL DEFAULT TRUE, payload JSONB, options JSONB, last_run_at TIMESTAMPTZ, next_run_at TIMESTAMPTZ NOT NULL, created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (id, org_id), FOREIGN KEY (workflow_id, workflow_type, org_id) REFERENCES configurations(id, type, org_id) ON DELETE CASCADE ) `); await client.query(` CREATE TABLE IF NOT EXISTS integration_oauth ( uid TEXT PRIMARY KEY, client_id TEXT NOT NULL, client_secret TEXT NOT NULL, expires_at TIMESTAMP NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) `); await client.query(`CREATE INDEX IF NOT EXISTS idx_configurations_type_org ON configurations(type, org_id)`); await client.query(`CREATE INDEX IF NOT EXISTS idx_configurations_version ON configurations(version)`); await client.query(`CREATE INDEX IF NOT EXISTS idx_configurations_integration_ids ON configurations USING GIN(integration_ids)`); await client.query(`CREATE INDEX IF NOT EXISTS idx_runs_config_id ON runs(config_id, org_id)`); await client.query(`CREATE INDEX IF NOT EXISTS idx_runs_started_at ON runs(started_at)`); await client.query(`CREATE INDEX IF NOT EXISTS idx_integrations_type ON integrations(type, org_id)`); await client.query(`CREATE INDEX IF NOT EXISTS idx_integrations_url_host ON integrations(url_host)`); await client.query(`CREATE INDEX IF NOT EXISTS idx_integration_details_integration_id ON integration_details(integration_id, org_id)`); await client.query(`CREATE INDEX IF NOT EXISTS idx_workflow_schedules_due ON workflow_schedules(next_run_at, enabled) WHERE enabled = true`); await client.query(`CREATE INDEX IF NOT EXISTS idx_integration_oauth_expires ON integration_oauth(expires_at)`); } finally { client.release(); } } private extractVersion(config: ConfigData): string | null { return (config as any)?.version || null; } private parseDates(data: any): any { if (!data) return data; const result = { ...data }; if (result.createdAt && typeof result.createdAt === 'string') { result.createdAt = new Date(result.createdAt); } if (result.updatedAt && typeof result.updatedAt === 'string') { result.updatedAt = new Date(result.updatedAt); } if (result.startedAt && typeof result.startedAt === 'string') { result.startedAt = new Date(result.startedAt); } if (result.completedAt && typeof result.completedAt === 'string') { result.completedAt = new Date(result.completedAt); } // Parse dates in nested config object if (result.config) { result.config = this.parseDates(result.config); } return result; } private async getConfig<T extends ConfigData>(id: string, type: ConfigType, orgId?: string): Promise<T | null> { if (!id) return null; const client = await this.pool.connect(); try { const result = await client.query( 'SELECT data FROM configurations WHERE id = $1 AND type = $2 AND org_id = $3', [id, type, orgId || ''] ); return result.rows[0] ? { ...this.parseDates(result.rows[0].data), id } : null; } finally { client.release(); } } private async listConfigs<T extends ConfigData>(type: ConfigType, limit = 10, offset = 0, orgId?: string): Promise<{ items: T[], total: number }> { const client = await this.pool.connect(); try { const countResult = await client.query( 'SELECT COUNT(*) FROM configurations WHERE type = $1 AND org_id = $2', [type, orgId || ''] ); const total = parseInt(countResult.rows[0].count); const result = await client.query( 'SELECT id, data FROM configurations WHERE type = $1 AND org_id = $2 ORDER BY created_at DESC LIMIT $3 OFFSET $4', [type, orgId || '', limit, offset] ); const items = result.rows.map(row => ({ ...this.parseDates(row.data), id: row.id })); return { items, total }; } finally { client.release(); } } private async upsertConfig<T extends ConfigData>(id: string, config: T, type: ConfigType, orgId?: string, integrationIds: string[] = []): Promise<T> { if (!id || !config) return null; const client = await this.pool.connect(); try { const version = this.extractVersion(config); await client.query(` INSERT INTO configurations (id, org_id, type, version, data, integration_ids, updated_at) VALUES ($1, $2, $3, $4, $5, $6, CURRENT_TIMESTAMP) ON CONFLICT (id, type, org_id) DO UPDATE SET data = $5, version = $4, integration_ids = $6, updated_at = CURRENT_TIMESTAMP `, [id, orgId || '', type, version, JSON.stringify(config), integrationIds]); return { ...config, id }; } finally { client.release(); } } private async deleteConfig(id: string, type: ConfigType, orgId?: string): Promise<boolean> { if (!id) return false; const client = await this.pool.connect(); try { const result = await client.query( 'DELETE FROM configurations WHERE id = $1 AND type = $2 AND org_id = $3', [id, type, orgId || ''] ); return result.rowCount > 0; } finally { client.release(); } } // API Config Methods async getApiConfig(params: { id: string; orgId?: string }): Promise<ApiConfig | null> { const { id, orgId } = params; return this.getConfig<ApiConfig>(id, 'api', orgId); } async listApiConfigs(params?: { limit?: number; offset?: number; orgId?: string }): Promise<{ items: ApiConfig[], total: number }> { const { limit = 10, offset = 0, orgId } = params || {}; return this.listConfigs<ApiConfig>('api', limit, offset, orgId); } async upsertApiConfig(params: { id: string; config: ApiConfig; orgId?: string }): Promise<ApiConfig> { const { id, config, orgId } = params; return this.upsertConfig(id, config, 'api', orgId); } async deleteApiConfig(params: { id: string; orgId?: string }): Promise<boolean> { const { id, orgId } = params; return this.deleteConfig(id, 'api', orgId); } // Extract Config Methods async getExtractConfig(params: { id: string; orgId?: string }): Promise<ExtractConfig | null> { const { id, orgId } = params; return this.getConfig<ExtractConfig>(id, 'extract', orgId); } async listExtractConfigs(params?: { limit?: number; offset?: number; orgId?: string }): Promise<{ items: ExtractConfig[], total: number }> { const { limit = 10, offset = 0, orgId } = params || {}; return this.listConfigs<ExtractConfig>('extract', limit, offset, orgId); } async upsertExtractConfig(params: { id: string; config: ExtractConfig; orgId?: string }): Promise<ExtractConfig> { const { id, config, orgId } = params; return this.upsertConfig(id, config, 'extract', orgId); } async deleteExtractConfig(params: { id: string; orgId?: string }): Promise<boolean> { const { id, orgId } = params; return this.deleteConfig(id, 'extract', orgId); } // Transform Config Methods async getTransformConfig(params: { id: string; orgId?: string }): Promise<TransformConfig | null> { const { id, orgId } = params; return this.getConfig<TransformConfig>(id, 'transform', orgId); } async listTransformConfigs(params?: { limit?: number; offset?: number; orgId?: string }): Promise<{ items: TransformConfig[], total: number }> { const { limit = 10, offset = 0, orgId } = params || {}; return this.listConfigs<TransformConfig>('transform', limit, offset, orgId); } async upsertTransformConfig(params: { id: string; config: TransformConfig; orgId?: string }): Promise<TransformConfig> { const { id, config, orgId } = params; return this.upsertConfig(id, config, 'transform', orgId); } async deleteTransformConfig(params: { id: string; orgId?: string }): Promise<boolean> { const { id, orgId } = params; return this.deleteConfig(id, 'transform', orgId); } // Run Result Methods async getRun(params: { id: string; orgId?: string }): Promise<RunResult | null> { const { id, orgId } = params; if (!id) return null; const client = await this.pool.connect(); try { const result = await client.query( 'SELECT data FROM runs WHERE id = $1 AND org_id = $2', [id, orgId || ''] ); if (!result.rows[0]) return null; const run = this.parseDates(result.rows[0].data); return { ...run, id }; } finally { client.release(); } } async listRuns(params?: { limit?: number; offset?: number; configId?: string; orgId?: string }): Promise<{ items: RunResult[], total: number }> { const { limit = 10, offset = 0, configId, orgId } = params || {}; const client = await this.pool.connect(); try { let countQuery = 'SELECT COUNT(*) FROM runs WHERE org_id = $1'; let selectQuery = 'SELECT id, data, started_at FROM runs WHERE org_id = $1'; let params = [orgId || '']; if (configId) { countQuery += ' AND config_id = $2'; selectQuery += ' AND config_id = $2'; params.push(configId); } const countResult = await client.query(countQuery, params); const total = parseInt(countResult.rows[0].count); selectQuery += ' ORDER BY started_at DESC LIMIT $' + (params.length + 1) + ' OFFSET $' + (params.length + 2); params.push(String(limit), String(offset)); const result = await client.query(selectQuery, params); const items = result.rows.map(row => { const run = row.data; return { ...run, id: row.id, startedAt: run.startedAt ? new Date(run.startedAt) : undefined, completedAt: run.completedAt ? new Date(run.completedAt) : undefined }; }); return { items, total }; } finally { client.release(); } } async createRun(params: { result: RunResult; orgId?: string }): Promise<RunResult> { const { result: run, orgId } = params; if (!run) return null; if ((run as any).stepResults) delete (run as any).stepResults; const client = await this.pool.connect(); try { await client.query(` INSERT INTO runs (id, config_id, org_id, data, started_at, completed_at) VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT (id, org_id) DO UPDATE SET data = $4, started_at = $5, completed_at = $6 `, [ run.id, run.config?.id, orgId || '', JSON.stringify(run), run.startedAt ? run.startedAt.toISOString() : null, run.completedAt ? run.completedAt.toISOString() : null ]); return run; } finally { client.release(); } } async deleteRun(params: { id: string; orgId?: string }): Promise<boolean> { const { id, orgId } = params; if (!id) return false; const client = await this.pool.connect(); try { const result = await client.query( 'DELETE FROM runs WHERE id = $1 AND org_id = $2', [id, orgId || ''] ); return result.rowCount > 0; } finally { client.release(); } } async deleteAllRuns(params?: { orgId?: string }): Promise<boolean> { const { orgId } = params || {}; const client = await this.pool.connect(); try { const result = await client.query( 'DELETE FROM runs WHERE org_id = $1', [orgId || ''] ); return result.rowCount > 0; } finally { client.release(); } } async getWorkflow(params: { id: string; orgId?: string }): Promise<Workflow | null> { const { id, orgId } = params; return this.getConfig<Workflow>(id, 'workflow', orgId); } async listWorkflows(params?: { limit?: number; offset?: number; orgId?: string }): Promise<{ items: Workflow[], total: number }> { const { limit = 10, offset = 0, orgId } = params || {}; return this.listConfigs<Workflow>('workflow', limit, offset, orgId); } async upsertWorkflow(params: { id: string; workflow: Workflow; orgId?: string }): Promise<Workflow> { const { id, workflow, orgId } = params; const integrationIds: string[] = []; return this.upsertConfig(id, workflow, 'workflow', orgId, integrationIds); } async deleteWorkflow(params: { id: string; orgId?: string }): Promise<boolean> { const { id, orgId } = params; return this.deleteConfig(id, 'workflow', orgId); } // Workflow Schedule Methods async listWorkflowSchedules(params: { workflowId: string, orgId: string }): Promise<WorkflowScheduleInternal[]> { const client = await this.pool.connect(); try { const query = 'SELECT id, org_id, workflow_id, cron_expression, timezone, enabled, payload, options, last_run_at, next_run_at, created_at, updated_at FROM workflow_schedules WHERE workflow_id = $1 AND org_id = $2'; const queryResult = await client.query(query, [params.workflowId, params.orgId]); return queryResult.rows.map(this.mapWorkflowSchedule); } finally { client.release(); } } async getWorkflowSchedule({ id, orgId }: { id: string; orgId?: string }): Promise<WorkflowScheduleInternal | null> { const client = await this.pool.connect(); try { const query = 'SELECT id, org_id, workflow_id, cron_expression, timezone, enabled, payload, options, last_run_at, next_run_at, created_at, updated_at FROM workflow_schedules WHERE id = $1 AND org_id = $2'; const queryResult = await client.query(query, [id, orgId || '']); if (!queryResult.rows[0]) { return null; } return this.mapWorkflowSchedule(queryResult.rows[0]); } finally { client.release(); } } async upsertWorkflowSchedule({ schedule }: { schedule: WorkflowScheduleInternal }): Promise<void> { const client = await this.pool.connect(); try { const query = ` INSERT INTO workflow_schedules (id, org_id, workflow_id, workflow_type, cron_expression, timezone, enabled, payload, options, last_run_at, next_run_at, updated_at) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, CURRENT_TIMESTAMP) ON CONFLICT (id, org_id) DO UPDATE SET cron_expression = $5, timezone = $6, enabled = $7, payload = $8, options = $9, last_run_at = $10, next_run_at = $11, updated_at = CURRENT_TIMESTAMP `; await client.query( query, [ schedule.id, schedule.orgId, schedule.workflowId, 'workflow', schedule.cronExpression, schedule.timezone, schedule.enabled, JSON.stringify(schedule.payload), JSON.stringify(schedule.options), schedule.lastRunAt ? schedule.lastRunAt.toISOString() : null, schedule.nextRunAt ? schedule.nextRunAt.toISOString() : null ] ); } finally { client.release(); } } async deleteWorkflowSchedule({ id, orgId }: { id: string, orgId: string }): Promise<boolean> { const client = await this.pool.connect(); try { const result = await client.query('DELETE FROM workflow_schedules WHERE id = $1 AND org_id = $2', [id, orgId]); return result.rowCount > 0; } finally { client.release(); } } async listDueWorkflowSchedules(): Promise<WorkflowScheduleInternal[]> { const client = await this.pool.connect(); // We check for schedules that are enabled and have a next run time that is in the past (all timestamps in the database are in UTC) try { const query = `SELECT id, org_id, workflow_id, cron_expression, timezone, enabled, payload, options, last_run_at, next_run_at, created_at, updated_at FROM workflow_schedules WHERE enabled = true AND next_run_at <= CURRENT_TIMESTAMP at time zone 'utc'`; const queryResult = await client.query(query); return queryResult.rows.map(this.mapWorkflowSchedule); } finally { client.release(); } } async updateScheduleNextRun(params: { id: string; nextRunAt: Date; lastRunAt: Date; }): Promise<boolean> { const client = await this.pool.connect(); try { const query = 'UPDATE workflow_schedules SET next_run_at = $1, last_run_at = $2 WHERE id = $3'; const result = await client.query(query, [params.nextRunAt ? params.nextRunAt.toISOString() : null, params.lastRunAt ? params.lastRunAt.toISOString() : null, params.id]); return result.rowCount > 0; } finally { client.release(); } } private mapWorkflowSchedule(row: any): WorkflowScheduleInternal { return { id: row.id, workflowId: row.workflow_id, orgId: row.org_id, cronExpression: row.cron_expression, timezone: row.timezone, enabled: row.enabled, payload: row.payload, options: row.options, lastRunAt: row.last_run_at, nextRunAt: row.next_run_at, createdAt: row.created_at, updatedAt: row.updated_at }; } // Integration Methods async getIntegration(params: { id: string; includeDocs?: boolean; orgId?: string }): Promise<Integration | null> { const { id, includeDocs = true, orgId } = params; if (!id) return null; const client = await this.pool.connect(); try { let query; if (includeDocs) { query = `SELECT i.id, i.name, i.type, i.url_host, i.url_path, i.credentials, i.documentation_url, i.documentation_pending, i.open_api_url, i.specific_instructions, i.documentation_keywords, i.icon, i.version, i.created_at, i.updated_at, d.documentation, d.open_api_schema FROM integrations i LEFT JOIN integration_details d ON i.id = d.integration_id AND i.org_id = d.org_id WHERE i.id = $1 AND i.org_id = $2`; } else { query = `SELECT id, name, type, url_host, url_path, credentials, documentation_url, documentation_pending, open_api_url, specific_instructions, documentation_keywords, icon, version, created_at, updated_at FROM integrations WHERE id = $1 AND org_id = $2`; } const result = await client.query(query, [id, orgId || '']); if (!result.rows[0]) return null; const row = result.rows[0] as any; const integration: Integration = { id: row.id, name: row.name, type: row.type, urlHost: row.url_host, urlPath: row.url_path, credentials: row.credentials ? credentialEncryption.decrypt(row.credentials) : {}, documentationUrl: row.documentation_url, documentation: includeDocs ? row.documentation : undefined, documentationPending: row.documentation_pending, openApiUrl: row.open_api_url, openApiSchema: includeDocs ? row.open_api_schema : undefined, specificInstructions: row.specific_instructions, documentationKeywords: row.documentation_keywords, icon: row.icon, version: row.version, createdAt: row.created_at, updatedAt: row.updated_at }; return integration; } finally { client.release(); } } async listIntegrations(params?: { limit?: number; offset?: number; includeDocs?: boolean; orgId?: string }): Promise<{ items: Integration[], total: number }> { const { limit = 10, offset = 0, includeDocs = false, orgId } = params || {}; const client = await this.pool.connect(); try { const countResult = await client.query( 'SELECT COUNT(*) FROM integrations WHERE org_id = $1', [orgId || ''] ); const total = parseInt(countResult.rows[0].count); let query; if (includeDocs) { query = `SELECT i.id, i.name, i.type, i.url_host, i.url_path, i.credentials, i.documentation_url, i.documentation_pending, i.open_api_url, i.specific_instructions, i.documentation_keywords, i.icon, i.version, i.created_at, i.updated_at, d.documentation, d.open_api_schema FROM integrations i LEFT JOIN integration_details d ON i.id = d.integration_id AND i.org_id = d.org_id WHERE i.org_id = $1 ORDER BY i.created_at DESC LIMIT $2 OFFSET $3`; } else { query = `SELECT id, name, type, url_host, url_path, credentials, documentation_url, documentation_pending, open_api_url, specific_instructions, documentation_keywords, icon, version, created_at, updated_at FROM integrations WHERE org_id = $1 ORDER BY created_at DESC LIMIT $2 OFFSET $3`; } const result = await client.query(query, [orgId || '', limit, offset]); const items = result.rows.map((row: any) => { const integration: Integration = { id: row.id, name: row.name, type: row.type, urlHost: row.url_host, urlPath: row.url_path, credentials: row.credentials ? credentialEncryption.decrypt(row.credentials) : {}, documentationUrl: row.documentation_url, documentation: includeDocs ? row.documentation : undefined, documentationPending: row.documentation_pending, openApiUrl: row.open_api_url, openApiSchema: includeDocs ? row.open_api_schema : undefined, specificInstructions: row.specific_instructions, documentationKeywords: row.documentation_keywords, icon: row.icon, version: row.version, createdAt: row.created_at, updatedAt: row.updated_at }; return integration; }); return { items, total }; } finally { client.release(); } } async upsertIntegration(params: { id: string; integration: Integration; orgId?: string }): Promise<Integration> { const { id, integration, orgId } = params; if (!id || !integration) return null; const client = await this.pool.connect(); try { await client.query('BEGIN'); // Encrypt credentials if provided const encryptedCredentials = integration.credentials ? credentialEncryption.encrypt(integration.credentials) : null; // Insert/update main integration record await client.query(` INSERT INTO integrations ( id, org_id, name, type, url_host, url_path, credentials, documentation_url, documentation_pending, open_api_url, specific_instructions, documentation_keywords, icon, version, created_at, updated_at ) VALUES ( $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16 ) ON CONFLICT (id, org_id) DO UPDATE SET name = $3, type = $4, url_host = $5, url_path = $6, credentials = $7, documentation_url = $8, documentation_pending = $9, open_api_url = $10, specific_instructions = $11, documentation_keywords = $12, icon = $13, version = $14, updated_at = $16 `, [ id, orgId || '', integration.name, integration.type, integration.urlHost, integration.urlPath, encryptedCredentials, integration.documentationUrl, integration.documentationPending, integration.openApiUrl, integration.specificInstructions, integration.documentationKeywords, integration.icon, integration.version, integration.createdAt || new Date(), integration.updatedAt || new Date() ]); // Insert/update details if any large fields are provided if (integration.documentation || integration.openApiSchema) { await client.query(` INSERT INTO integration_details ( integration_id, org_id, documentation, open_api_schema ) VALUES ($1, $2, $3, $4) ON CONFLICT (integration_id, org_id) DO UPDATE SET documentation = COALESCE($3, integration_details.documentation), open_api_schema = COALESCE($4, integration_details.open_api_schema) `, [ id, orgId || '', integration.documentation, integration.openApiSchema ]); } await client.query('COMMIT'); return { ...integration, id }; } catch (error) { await client.query('ROLLBACK'); throw error; } finally { client.release(); } } async deleteIntegration(params: { id: string; orgId?: string }): Promise<boolean> { const { id, orgId } = params; if (!id) return false; const client = await this.pool.connect(); try { // Delete integration_details first due to foreign key constraint await client.query( 'DELETE FROM integration_details WHERE integration_id = $1 AND org_id = $2', [id, orgId || ''] ); // Then delete the integration const result = await client.query( 'DELETE FROM integrations WHERE id = $1 AND org_id = $2', [id, orgId || ''] ); return result.rowCount > 0; } finally { client.release(); } } async copyTemplateDocumentationToUserIntegration(params: { templateId: string; userIntegrationId: string; orgId?: string }): Promise<boolean> { const { templateId, userIntegrationId, orgId } = params; if (!templateId || !userIntegrationId) return false; const client = await this.pool.connect(); try { // Copy the template documentation (identified by the org_id == 'template') to the user integration const result = await client.query('INSERT INTO integration_details (integration_id, org_id, documentation, open_api_schema) SELECT $1::text, $2::text, documentation, open_api_schema FROM integration_details WHERE integration_id = $3 AND org_id = $4', [userIntegrationId, orgId || '', templateId, 'template']); // return true, if we inserted at least one row return result.rowCount > 0; } finally { client.release(); } } // Tenant Information Methods async getTenantInfo(): Promise<{ email: string | null; emailEntrySkipped: boolean }> { const client = await this.pool.connect(); try { const result = await client.query('SELECT email, email_entry_skipped FROM tenant_info WHERE id = $1', ['default']); if (result.rows[0]) { return { email: result.rows[0].email, emailEntrySkipped: result.rows[0].email_entry_skipped }; } return { email: null, emailEntrySkipped: false }; } catch (error) { console.error('Error getting tenant info:', error); return { email: null, emailEntrySkipped: false }; } finally { client.release(); } } async setTenantInfo(params?: { email?: string; emailEntrySkipped?: boolean }): Promise<void> { const { email, emailEntrySkipped } = params || {}; const client = await this.pool.connect(); try { await client.query(` INSERT INTO tenant_info (id, email, email_entry_skipped, updated_at) VALUES ('default', $1, $2, CURRENT_TIMESTAMP) ON CONFLICT (id) DO UPDATE SET email = COALESCE($1, tenant_info.email), email_entry_skipped = COALESCE($2, tenant_info.email_entry_skipped), updated_at = CURRENT_TIMESTAMP `, [email, emailEntrySkipped]); } catch (error) { console.error('Error setting tenant info:', error); } finally { client.release(); } } // Utility methods async clearAll(orgId?: string): Promise<void> { const client = await this.pool.connect(); try { const condition = 'WHERE org_id = $1'; const param = [orgId || '']; await client.query(`DELETE FROM runs ${condition}`, param); await client.query(`DELETE FROM configurations ${condition}`, param); await client.query(`DELETE FROM workflow_schedules ${condition}`, param); await client.query(`DELETE FROM integration_details ${condition}`, param); // Delete details first await client.query(`DELETE FROM integrations ${condition}`, param); } finally { client.release(); } } async disconnect(): Promise<void> { await this.pool.end(); } async ping(): Promise<boolean> { try { const client = await this.pool.connect(); await client.query('SELECT 1'); client.release(); return true; } catch (error) { return false; } } async getTemplateOAuthCredentials(params: { templateId: string }): Promise<{ client_id: string; client_secret: string } | null> { const client = await this.pool.connect(); try { const result = await client.query( 'SELECT sg_client_id, sg_client_secret FROM integration_templates WHERE id = $1', [params.templateId] ); if (!result.rows[0]) return null; const decrypted = credentialEncryption.decrypt({ secret: result.rows[0].sg_client_secret }); return { client_id: result.rows[0].sg_client_id, client_secret: decrypted?.secret || '' }; } catch (error) { logMessage('debug', `No template OAuth credentials found for ${params.templateId}`, { error }); return null; } finally { client.release(); } } async cacheOAuthSecret(params: { uid: string; clientId: string; clientSecret: string; ttlMs: number }): Promise<void> { const client = await this.pool.connect(); try { const expiresAt = new Date(Date.now() + params.ttlMs); const encrypted = credentialEncryption.encrypt({ secret: params.clientSecret }); const encryptedSecret = encrypted?.secret || params.clientSecret; await client.query( `INSERT INTO integration_oauth (uid, client_id, client_secret, expires_at) VALUES ($1, $2, $3, $4) ON CONFLICT (uid) DO UPDATE SET client_id = EXCLUDED.client_id, client_secret = EXCLUDED.client_secret, expires_at = EXCLUDED.expires_at`, [params.uid, params.clientId, encryptedSecret, expiresAt] ); } finally { client.release(); } } async getOAuthSecret(params: { uid: string }): Promise<{ clientId: string; clientSecret: string } | null> { const client = await this.pool.connect(); try { const result = await client.query( `SELECT client_id, client_secret, expires_at FROM integration_oauth WHERE uid = $1`, [params.uid] ); if (result.rows.length === 0) { return null; } const row = result.rows[0]; if (new Date(row.expires_at) <= new Date()) { await client.query('DELETE FROM integration_oauth WHERE uid = $1', [params.uid]); return null; } await client.query('DELETE FROM integration_oauth WHERE uid = $1', [params.uid]); const decrypted = credentialEncryption.decrypt({ secret: row.client_secret }); return { clientId: row.client_id, clientSecret: decrypted?.secret || '' }; } finally { client.release(); } } async deleteOAuthSecret(params: { uid: string }): Promise<void> { const client = await this.pool.connect(); try { await client.query('DELETE FROM integration_oauth WHERE uid = $1', [params.uid]); } finally { client.release(); } } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/superglue-ai/superglue'

If you have feedback or need assistance with the MCP directory API, please join our Discord server