Skip to main content
Glama
postgres.ts57.1 kB
import type { ApiConfig, DiscoveryRun, FileReference, FileStatus, Integration, Run, RunStatus, Tool } from "@superglue/shared"; import { Pool, PoolConfig } from 'pg'; import { credentialEncryption } from "../utils/encryption.js"; import { logMessage } from "../utils/logs.js"; import { extractRun } from "./migrations/run-migration.js"; import type { DataStore, ToolScheduleInternal } from "./types.js"; type ConfigType = 'api' | 'workflow'; type ConfigData = ApiConfig | Tool; export class PostgresService implements DataStore { private pool: Pool; constructor(config: PoolConfig) { this.pool = new Pool({ ...config, max: 20, min: 2, ssl: config.ssl === false || config.host.includes('localhost') || config.host.includes('127.0.0.1') ? false : { rejectUnauthorized: false } }); this.pool.on('error', (err) => { logMessage('error', 'postgres pool error: ' + (err instanceof Error ? err.message : String(err))); }); this.pool.on('connect', () => { logMessage('debug', 'Datastore: 🐘 postgres connected'); }); this.pool.connect().catch((err) => { logMessage('error', '[CRITICAL] Postgres connection failed: ' + (err instanceof Error ? err.message : String(err))); process.exit(1); }).then((client) => { client?.release(); }); this.initializeTables(); } async getManyIntegrations(params: { ids: string[]; includeDocs?: boolean; orgId?: string }): Promise<Integration[]> { const { ids, includeDocs = true, orgId } = params; const client = await this.pool.connect(); try { let query; if (includeDocs) { query = `SELECT i.id, i.name, i.type, i.url_host, i.url_path, i.credentials, i.documentation_url, i.documentation_pending, i.open_api_url, i.specific_instructions, i.documentation_keywords, i.icon, i.version, i.created_at, i.updated_at, d.documentation, d.open_api_schema FROM integrations i LEFT JOIN integration_details d ON i.id = d.integration_id AND i.org_id = d.org_id WHERE i.id = ANY($1) AND i.org_id = $2`; } else { query = `SELECT id, name, type, url_host, url_path, credentials, documentation_url, documentation_pending, open_api_url, specific_instructions, documentation_keywords, icon, version, created_at, updated_at FROM integrations WHERE id = ANY($1) AND org_id = $2`; } const result = await client.query(query, [ids, orgId || '']); return result.rows.map((row: any) => { const integration: Integration = { id: row.id, name: row.name, type: row.type, urlHost: row.url_host, urlPath: row.url_path, credentials: row.credentials ? credentialEncryption.decrypt(row.credentials) : {}, documentationUrl: row.documentation_url, documentation: includeDocs ? row.documentation : undefined, documentationPending: row.documentation_pending, openApiUrl: row.open_api_url, openApiSchema: includeDocs ? row.open_api_schema : undefined, specificInstructions: row.specific_instructions, documentationKeywords: row.documentation_keywords, icon: row.icon, version: row.version, createdAt: row.created_at, updatedAt: row.updated_at }; return integration; }); } finally { client.release(); } } private async initializeTables(): Promise<void> { const client = await this.pool.connect(); try { // Unified configurations table (merged configs + workflows) await client.query(` CREATE TABLE IF NOT EXISTS configurations ( id VARCHAR(255) NOT NULL, org_id VARCHAR(255), type VARCHAR(20) NOT NULL CHECK (type IN ('api', 'workflow')), version VARCHAR(50), data JSONB NOT NULL, integration_ids VARCHAR(255)[] DEFAULT '{}', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (id, type, org_id) ) `); // Runs table await client.query(` CREATE TABLE IF NOT EXISTS runs ( id VARCHAR(255) NOT NULL, config_id VARCHAR(255), org_id VARCHAR(255), data JSONB NOT NULL, started_at TIMESTAMP, completed_at TIMESTAMP, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (id, org_id) ) `); // Integrations table (without large fields) await client.query(` CREATE TABLE IF NOT EXISTS integrations ( id VARCHAR(255) NOT NULL, org_id VARCHAR(255), name VARCHAR(255), type VARCHAR(100), url_host VARCHAR(500), url_path VARCHAR(500), credentials JSONB, -- Encrypted JSON object documentation_url VARCHAR(1000), documentation_pending BOOLEAN DEFAULT FALSE, open_api_url VARCHAR(1000), specific_instructions TEXT, documentation_keywords TEXT[], icon VARCHAR(255), version VARCHAR(50), created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (id, org_id) ) `); // New table for large integration fields await client.query(` CREATE TABLE IF NOT EXISTS integration_details ( integration_id VARCHAR(255) NOT NULL, org_id VARCHAR(255), documentation TEXT, open_api_schema TEXT, PRIMARY KEY (integration_id, org_id), FOREIGN KEY (integration_id, org_id) REFERENCES integrations(id, org_id) ON DELETE CASCADE ) `); // Integration templates table for Superglue OAuth credentials (and potentially further fields in the future) await client.query(` CREATE TABLE IF NOT EXISTS integration_templates ( id VARCHAR(255) PRIMARY KEY, sg_client_id VARCHAR(500), sg_client_secret TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) `); // Tenant info table await client.query(` CREATE TABLE IF NOT EXISTS tenant_info ( id VARCHAR(10) DEFAULT 'default', email VARCHAR(255), email_entry_skipped BOOLEAN DEFAULT FALSE, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (id) ) `); await client.query(` CREATE TABLE IF NOT EXISTS workflow_schedules ( id UUID NOT NULL, org_id TEXT NOT NULL, workflow_id TEXT NOT NULL, workflow_type TEXT NOT NULL, cron_expression TEXT NOT NULL, timezone TEXT NOT NULL, enabled BOOLEAN NOT NULL DEFAULT TRUE, payload JSONB, options JSONB, last_run_at TIMESTAMPTZ, next_run_at TIMESTAMPTZ NOT NULL, created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (id, org_id), FOREIGN KEY (workflow_id, workflow_type, org_id) REFERENCES configurations(id, type, org_id) ON DELETE CASCADE ) `); await client.query(` CREATE TABLE IF NOT EXISTS integration_oauth ( uid TEXT PRIMARY KEY, client_id TEXT NOT NULL, client_secret TEXT NOT NULL, expires_at TIMESTAMP NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) `); await client.query(` CREATE TABLE IF NOT EXISTS discovery_runs ( id VARCHAR(255) NOT NULL, org_id VARCHAR(255) NOT NULL, sources JSONB NOT NULL, data JSONB, status VARCHAR(50) NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (id, org_id) ) `); await client.query(` CREATE TABLE IF NOT EXISTS file_references ( id VARCHAR(255) NOT NULL, org_id VARCHAR(255) NOT NULL, storage_uri TEXT NOT NULL, processed_storage_uri TEXT, metadata JSONB NOT NULL, status VARCHAR(50) NOT NULL, error TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (id, org_id) ) `); await client.query(`CREATE INDEX IF NOT EXISTS idx_configurations_type_org ON configurations(type, org_id)`); await client.query(`CREATE INDEX IF NOT EXISTS idx_configurations_version ON configurations(version)`); await client.query(`CREATE INDEX IF NOT EXISTS idx_configurations_integration_ids ON configurations USING GIN(integration_ids)`); await client.query(`CREATE INDEX IF NOT EXISTS idx_runs_config_id ON runs(config_id, org_id)`); await client.query(`CREATE INDEX IF NOT EXISTS idx_runs_started_at ON runs(started_at)`); await client.query(`CREATE INDEX IF NOT EXISTS idx_integrations_type ON integrations(type, org_id)`); await client.query(`CREATE INDEX IF NOT EXISTS idx_integrations_url_host ON integrations(url_host)`); await client.query(`CREATE INDEX IF NOT EXISTS idx_integration_details_integration_id ON integration_details(integration_id, org_id)`); await client.query(`CREATE INDEX IF NOT EXISTS idx_workflow_schedules_due ON workflow_schedules(next_run_at, enabled) WHERE enabled = true`); await client.query(`CREATE INDEX IF NOT EXISTS idx_integration_oauth_expires ON integration_oauth(expires_at)`); await client.query(`CREATE INDEX IF NOT EXISTS idx_discovery_runs_org_status ON discovery_runs(org_id, status)`); await client.query(`CREATE INDEX IF NOT EXISTS idx_file_references_org_id ON file_references(org_id, id)`); await client.query(`CREATE INDEX IF NOT EXISTS idx_file_references_org_status ON file_references(org_id, status)`); } finally { client.release(); } } private extractVersion(config: ConfigData): string | null { return (config as any)?.version || null; } private parseDates(data: any): any { if (!data) return data; const result = { ...data }; if (result.createdAt && typeof result.createdAt === 'string') { result.createdAt = new Date(result.createdAt); } if (result.updatedAt && typeof result.updatedAt === 'string') { result.updatedAt = new Date(result.updatedAt); } if (result.startedAt && typeof result.startedAt === 'string') { result.startedAt = new Date(result.startedAt); } if (result.completedAt && typeof result.completedAt === 'string') { result.completedAt = new Date(result.completedAt); } // Parse dates in nested config object if (result.config) { result.config = this.parseDates(result.config); } return result; } private async getConfig<T extends ConfigData>(id: string, type: ConfigType, orgId?: string): Promise<T | null> { if (!id) return null; const client = await this.pool.connect(); try { const result = await client.query( 'SELECT data FROM configurations WHERE id = $1 AND type = $2 AND org_id = $3', [id, type, orgId || ''] ); return result.rows[0] ? { ...this.parseDates(result.rows[0].data), id } : null; } finally { client.release(); } } private async listConfigs<T extends ConfigData>(type: ConfigType, limit = 10, offset = 0, orgId?: string): Promise<{ items: T[], total: number }> { const client = await this.pool.connect(); try { const countResult = await client.query( 'SELECT COUNT(*) FROM configurations WHERE type = $1 AND org_id = $2', [type, orgId || ''] ); const total = parseInt(countResult.rows[0].count); const result = await client.query( 'SELECT id, data FROM configurations WHERE type = $1 AND org_id = $2 ORDER BY created_at DESC LIMIT $3 OFFSET $4', [type, orgId || '', limit, offset] ); const items = result.rows.map(row => ({ ...this.parseDates(row.data), id: row.id })); return { items, total }; } finally { client.release(); } } private async upsertConfig<T extends ConfigData>(id: string, config: T, type: ConfigType, orgId?: string, integrationIds: string[] = []): Promise<T> { if (!id || !config) return null; const client = await this.pool.connect(); try { const version = this.extractVersion(config); await client.query(` INSERT INTO configurations (id, org_id, type, version, data, integration_ids, updated_at) VALUES ($1, $2, $3, $4, $5, $6, CURRENT_TIMESTAMP) ON CONFLICT (id, type, org_id) DO UPDATE SET data = $5, version = $4, integration_ids = $6, updated_at = CURRENT_TIMESTAMP `, [id, orgId || '', type, version, JSON.stringify(config), integrationIds]); return { ...config, id }; } finally { client.release(); } } private async deleteConfig(id: string, type: ConfigType, orgId?: string): Promise<boolean> { if (!id) return false; const client = await this.pool.connect(); try { const result = await client.query( 'DELETE FROM configurations WHERE id = $1 AND type = $2 AND org_id = $3', [id, type, orgId || ''] ); return result.rowCount > 0; } finally { client.release(); } } // API Config Methods async getApiConfig(params: { id: string; orgId?: string }): Promise<ApiConfig | null> { const { id, orgId } = params; return this.getConfig<ApiConfig>(id, 'api', orgId); } async listApiConfigs(params?: { limit?: number; offset?: number; orgId?: string }): Promise<{ items: ApiConfig[], total: number }> { const { limit = 10, offset = 0, orgId } = params || {}; return this.listConfigs<ApiConfig>('api', limit, offset, orgId); } async upsertApiConfig(params: { id: string; config: ApiConfig; orgId?: string }): Promise<ApiConfig> { const { id, config, orgId } = params; return this.upsertConfig(id, config, 'api', orgId); } async deleteApiConfig(params: { id: string; orgId?: string }): Promise<boolean> { const { id, orgId } = params; return this.deleteConfig(id, 'api', orgId); } // Run Methods async getRun(params: { id: string; orgId?: string }): Promise<Run | null> { const { id, orgId } = params; if (!id) return null; const client = await this.pool.connect(); try { const result = await client.query( 'SELECT id, config_id, data, started_at, completed_at FROM runs WHERE id = $1 AND org_id = $2', [id, orgId || ''] ); if (!result.rows[0]) return null; const row = result.rows[0]; return extractRun(row.data, { id: row.id, config_id: row.config_id, started_at: row.started_at, completed_at: row.completed_at }); } finally { client.release(); } } async listRuns(params?: { limit?: number; offset?: number; configId?: string; status?: RunStatus; orgId?: string }): Promise<{ items: Run[], total: number }> { const { limit = 10, offset = 0, configId, status, orgId } = params || {}; const client = await this.pool.connect(); try { let selectQuery = ` SELECT id, config_id, data, started_at, completed_at, COUNT(*) OVER() as total_count FROM runs WHERE org_id = $1 `; const queryParams: string[] = [orgId || '']; if (configId) { selectQuery += ' AND config_id = $2'; queryParams.push(configId); } if (status !== undefined) { const paramIndex = queryParams.length + 1; selectQuery += ` AND data->>'status' = $${paramIndex}`; queryParams.push(status); } selectQuery += ' ORDER BY started_at DESC LIMIT $' + (queryParams.length + 1) + ' OFFSET $' + (queryParams.length + 2); queryParams.push(String(limit), String(offset)); const result = await client.query(selectQuery, queryParams); const total = result.rows.length > 0 ? parseInt(result.rows[0].total_count) : 0; const items = result.rows.map(row => extractRun(row.data, { id: row.id, config_id: row.config_id, started_at: row.started_at, completed_at: row.completed_at })); return { items, total }; } finally { client.release(); } } async createRun(params: { run: Run }): Promise<Run> { const { run } = params; if (!run) throw new Error('Run is required'); const client = await this.pool.connect(); try { const result = await client.query(` INSERT INTO runs (id, config_id, org_id, data, started_at, completed_at) VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT (id, org_id) DO NOTHING `, [ run.id, run.toolId, run.orgId || '', JSON.stringify(run), run.startedAt ? run.startedAt.toISOString() : null, run.completedAt ? run.completedAt.toISOString() : null ]); if (result.rowCount === 0) { throw new Error(`Run with id ${run.id} already exists`); } return run; } finally { client.release(); } } async updateRun(params: { id: string; orgId: string; updates: Partial<Run> }): Promise<Run> { const { id, orgId, updates } = params; const client = await this.pool.connect(); try { const existingResult = await client.query( 'SELECT id, config_id, data, started_at, completed_at FROM runs WHERE id = $1 AND org_id = $2', [id, orgId] ); if (!existingResult.rows[0]) { throw new Error(`Run with id ${id} not found`); } const row = existingResult.rows[0]; const existingRun = extractRun(row.data, { id: row.id, config_id: row.config_id, started_at: row.started_at, completed_at: row.completed_at }); const updatedRun: Run = { ...existingRun, ...updates, id, orgId, startedAt: existingRun.startedAt, }; await client.query(` UPDATE runs SET data = $1, completed_at = $2 WHERE id = $3 AND org_id = $4 `, [ JSON.stringify(updatedRun), updatedRun.completedAt ? updatedRun.completedAt.toISOString() : null, id, orgId ]); return updatedRun; } finally { client.release(); } } async getWorkflow(params: { id: string; orgId?: string }): Promise<Tool | null> { const { id, orgId } = params; return this.getConfig<Tool>(id, 'workflow', orgId); } async listWorkflows(params?: { limit?: number; offset?: number; orgId?: string }): Promise<{ items: Tool[], total: number }> { const { limit = 10, offset = 0, orgId } = params || {}; return this.listConfigs<Tool>('workflow', limit, offset, orgId); } async upsertWorkflow(params: { id: string; workflow: Tool; orgId?: string }): Promise<Tool> { const { id, workflow, orgId } = params; const integrationIds: string[] = []; return this.upsertConfig(id, workflow, 'workflow', orgId, integrationIds); } async deleteWorkflow(params: { id: string; orgId?: string }): Promise<boolean> { const { id, orgId } = params; return this.deleteConfig(id, 'workflow', orgId); } async renameWorkflow(params: { oldId: string; newId: string; orgId?: string }): Promise<Tool> { const { oldId, newId, orgId = '' } = params; const client = await this.pool.connect(); try { await client.query('BEGIN'); // 1. Check if newId already exists const existingCheck = await client.query( 'SELECT id FROM configurations WHERE id = $1 AND type = $2 AND org_id = $3', [newId, 'workflow', orgId] ); if (existingCheck.rows.length > 0) { throw new Error(`Workflow with ID '${newId}' already exists`); } // 2. Get old workflow const oldWorkflowResult = await client.query( 'SELECT data FROM configurations WHERE id = $1 AND type = $2 AND org_id = $3', [oldId, 'workflow', orgId] ); if (oldWorkflowResult.rows.length === 0) { throw new Error(`Workflow with ID '${oldId}' not found`); } const oldWorkflow = oldWorkflowResult.rows[0].data as Tool; const now = new Date(); // 3. Create new workflow with newId const newWorkflow: Tool = { ...oldWorkflow, id: newId, updatedAt: now }; const integrationIds: string[] = []; await client.query( `INSERT INTO configurations (id, org_id, type, version, data, integration_ids, created_at, updated_at) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)`, [ newId, orgId, 'workflow', newWorkflow.version || null, newWorkflow, integrationIds, newWorkflow.createdAt || now, now ] ); // 4. CRITICAL: Update workflow_schedules BEFORE deleting old workflow (to avoid CASCADE delete) await client.query( `UPDATE workflow_schedules SET workflow_id = $1, updated_at = CURRENT_TIMESTAMP WHERE workflow_id = $2 AND workflow_type = 'workflow' AND org_id = $3`, [newId, oldId, orgId] ); // 5. Delete old workflow await client.query( 'DELETE FROM configurations WHERE id = $1 AND type = $2 AND org_id = $3', [oldId, 'workflow', orgId] ); await client.query('COMMIT'); return newWorkflow; } catch (error) { await client.query('ROLLBACK'); throw error; } finally { client.release(); } } // Workflow Schedule Methods async listWorkflowSchedules(params: { workflowId?: string, orgId: string }): Promise<ToolScheduleInternal[]> { const client = await this.pool.connect(); try { let query: string; let queryParams: string[]; if (params.workflowId) { query = 'SELECT id, org_id, workflow_id, cron_expression, timezone, enabled, payload, options, last_run_at, next_run_at, created_at, updated_at FROM workflow_schedules WHERE workflow_id = $1 AND org_id = $2'; queryParams = [params.workflowId, params.orgId]; } else { query = 'SELECT id, org_id, workflow_id, cron_expression, timezone, enabled, payload, options, last_run_at, next_run_at, created_at, updated_at FROM workflow_schedules WHERE org_id = $1'; queryParams = [params.orgId]; } const queryResult = await client.query(query, queryParams); return queryResult.rows.map(this.mapWorkflowSchedule); } finally { client.release(); } } async getWorkflowSchedule({ id, orgId }: { id: string; orgId?: string }): Promise<ToolScheduleInternal | null> { const client = await this.pool.connect(); try { const query = 'SELECT id, org_id, workflow_id, cron_expression, timezone, enabled, payload, options, last_run_at, next_run_at, created_at, updated_at FROM workflow_schedules WHERE id = $1 AND org_id = $2'; const queryResult = await client.query(query, [id, orgId || '']); if (!queryResult.rows[0]) { return null; } return this.mapWorkflowSchedule(queryResult.rows[0]); } finally { client.release(); } } async upsertWorkflowSchedule({ schedule }: { schedule: ToolScheduleInternal }): Promise<void> { const client = await this.pool.connect(); try { const query = ` INSERT INTO workflow_schedules (id, org_id, workflow_id, workflow_type, cron_expression, timezone, enabled, payload, options, last_run_at, next_run_at, updated_at) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, CURRENT_TIMESTAMP) ON CONFLICT (id, org_id) DO UPDATE SET cron_expression = $5, timezone = $6, enabled = $7, payload = $8, options = $9, last_run_at = $10, next_run_at = $11, updated_at = CURRENT_TIMESTAMP `; await client.query( query, [ schedule.id, schedule.orgId, schedule.workflowId, 'workflow', schedule.cronExpression, schedule.timezone, schedule.enabled, JSON.stringify(schedule.payload), JSON.stringify(schedule.options), schedule.lastRunAt ? schedule.lastRunAt.toISOString() : null, schedule.nextRunAt ? schedule.nextRunAt.toISOString() : null ] ); } finally { client.release(); } } async deleteWorkflowSchedule({ id, orgId }: { id: string, orgId: string }): Promise<boolean> { const client = await this.pool.connect(); try { const result = await client.query('DELETE FROM workflow_schedules WHERE id = $1 AND org_id = $2', [id, orgId]); return result.rowCount > 0; } finally { client.release(); } } async listDueWorkflowSchedules(): Promise<ToolScheduleInternal[]> { const client = await this.pool.connect(); // We check for schedules that are enabled and have a next run time that is in the past (all timestamps in the database are in UTC) try { const query = `SELECT id, org_id, workflow_id, cron_expression, timezone, enabled, payload, options, last_run_at, next_run_at, created_at, updated_at FROM workflow_schedules WHERE enabled = true AND next_run_at <= CURRENT_TIMESTAMP at time zone 'utc'`; const queryResult = await client.query(query); return queryResult.rows.map(this.mapWorkflowSchedule); } finally { client.release(); } } async updateScheduleNextRun(params: { id: string; nextRunAt: Date; lastRunAt: Date; }): Promise<boolean> { const client = await this.pool.connect(); try { const query = 'UPDATE workflow_schedules SET next_run_at = $1, last_run_at = $2 WHERE id = $3'; const result = await client.query(query, [params.nextRunAt ? params.nextRunAt.toISOString() : null, params.lastRunAt ? params.lastRunAt.toISOString() : null, params.id]); return result.rowCount > 0; } finally { client.release(); } } private mapWorkflowSchedule(row: any): ToolScheduleInternal { return { id: row.id, workflowId: row.workflow_id, orgId: row.org_id, cronExpression: row.cron_expression, timezone: row.timezone, enabled: row.enabled, payload: row.payload, options: row.options, lastRunAt: row.last_run_at, nextRunAt: row.next_run_at, createdAt: row.created_at, updatedAt: row.updated_at }; } // Integration Methods async getIntegration(params: { id: string; includeDocs?: boolean; orgId?: string }): Promise<Integration | null> { const { id, includeDocs = true, orgId } = params; if (!id) return null; const client = await this.pool.connect(); try { let query; if (includeDocs) { query = `SELECT i.id, i.name, i.type, i.url_host, i.url_path, i.credentials, i.documentation_url, i.documentation_pending, i.open_api_url, i.specific_instructions, i.documentation_keywords, i.icon, i.version, i.created_at, i.updated_at, d.documentation, d.open_api_schema FROM integrations i LEFT JOIN integration_details d ON i.id = d.integration_id AND i.org_id = d.org_id WHERE i.id = $1 AND i.org_id = $2`; } else { query = `SELECT id, name, type, url_host, url_path, credentials, documentation_url, documentation_pending, open_api_url, specific_instructions, documentation_keywords, icon, version, created_at, updated_at FROM integrations WHERE id = $1 AND org_id = $2`; } const result = await client.query(query, [id, orgId || '']); if (!result.rows[0]) return null; const row = result.rows[0] as any; const integration: Integration = { id: row.id, name: row.name, type: row.type, urlHost: row.url_host, urlPath: row.url_path, credentials: row.credentials ? credentialEncryption.decrypt(row.credentials) : {}, documentationUrl: row.documentation_url, documentation: includeDocs ? row.documentation : undefined, documentationPending: row.documentation_pending, openApiUrl: row.open_api_url, openApiSchema: includeDocs ? row.open_api_schema : undefined, specificInstructions: row.specific_instructions, documentationKeywords: row.documentation_keywords, icon: row.icon, version: row.version, createdAt: row.created_at, updatedAt: row.updated_at }; return integration; } finally { client.release(); } } async listIntegrations(params?: { limit?: number; offset?: number; includeDocs?: boolean; orgId?: string }): Promise<{ items: Integration[], total: number }> { const { limit = 10, offset = 0, includeDocs = false, orgId } = params || {}; const client = await this.pool.connect(); try { const countResult = await client.query( 'SELECT COUNT(*) FROM integrations WHERE org_id = $1', [orgId || ''] ); const total = parseInt(countResult.rows[0].count); let query; if (includeDocs) { query = `SELECT i.id, i.name, i.type, i.url_host, i.url_path, i.credentials, i.documentation_url, i.documentation_pending, i.open_api_url, i.specific_instructions, i.documentation_keywords, i.icon, i.version, i.created_at, i.updated_at, d.documentation, d.open_api_schema FROM integrations i LEFT JOIN integration_details d ON i.id = d.integration_id AND i.org_id = d.org_id WHERE i.org_id = $1 ORDER BY i.created_at DESC LIMIT $2 OFFSET $3`; } else { query = `SELECT id, name, type, url_host, url_path, credentials, documentation_url, documentation_pending, open_api_url, specific_instructions, documentation_keywords, icon, version, created_at, updated_at FROM integrations WHERE org_id = $1 ORDER BY created_at DESC LIMIT $2 OFFSET $3`; } const result = await client.query(query, [orgId || '', limit, offset]); const items = result.rows.map((row: any) => { const integration: Integration = { id: row.id, name: row.name, type: row.type, urlHost: row.url_host, urlPath: row.url_path, credentials: row.credentials ? credentialEncryption.decrypt(row.credentials) : {}, documentationUrl: row.documentation_url, documentation: includeDocs ? row.documentation : undefined, documentationPending: row.documentation_pending, openApiUrl: row.open_api_url, openApiSchema: includeDocs ? row.open_api_schema : undefined, specificInstructions: row.specific_instructions, documentationKeywords: row.documentation_keywords, icon: row.icon, version: row.version, createdAt: row.created_at, updatedAt: row.updated_at }; return integration; }); return { items, total }; } finally { client.release(); } } async upsertIntegration(params: { id: string; integration: Integration; orgId?: string }): Promise<Integration> { const { id, integration, orgId } = params; if (!id || !integration) return null; const client = await this.pool.connect(); try { await client.query('BEGIN'); // Encrypt credentials if provided const encryptedCredentials = integration.credentials ? credentialEncryption.encrypt(integration.credentials) : null; // Insert/update main integration record await client.query(` INSERT INTO integrations ( id, org_id, name, type, url_host, url_path, credentials, documentation_url, documentation_pending, open_api_url, specific_instructions, documentation_keywords, icon, version, created_at, updated_at ) VALUES ( $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16 ) ON CONFLICT (id, org_id) DO UPDATE SET name = $3, type = $4, url_host = $5, url_path = $6, credentials = $7, documentation_url = $8, documentation_pending = $9, open_api_url = $10, specific_instructions = $11, documentation_keywords = $12, icon = $13, version = $14, updated_at = $16 `, [ id, orgId || '', integration.name, integration.type, integration.urlHost, integration.urlPath, encryptedCredentials, integration.documentationUrl, integration.documentationPending, integration.openApiUrl, integration.specificInstructions, integration.documentationKeywords, integration.icon, integration.version, integration.createdAt || new Date(), integration.updatedAt || new Date() ]); // Insert/update details if any large fields are provided if (integration.documentation || integration.openApiSchema) { await client.query(` INSERT INTO integration_details ( integration_id, org_id, documentation, open_api_schema ) VALUES ($1, $2, $3, $4) ON CONFLICT (integration_id, org_id) DO UPDATE SET documentation = COALESCE($3, integration_details.documentation), open_api_schema = COALESCE($4, integration_details.open_api_schema) `, [ id, orgId || '', integration.documentation, integration.openApiSchema ]); } await client.query('COMMIT'); return { ...integration, id }; } catch (error) { await client.query('ROLLBACK'); throw error; } finally { client.release(); } } async deleteIntegration(params: { id: string; orgId?: string }): Promise<boolean> { const { id, orgId } = params; if (!id) return false; const client = await this.pool.connect(); try { // Delete integration_details first due to foreign key constraint await client.query( 'DELETE FROM integration_details WHERE integration_id = $1 AND org_id = $2', [id, orgId || ''] ); // Then delete the integration const result = await client.query( 'DELETE FROM integrations WHERE id = $1 AND org_id = $2', [id, orgId || ''] ); return result.rowCount > 0; } finally { client.release(); } } async copyTemplateDocumentationToUserIntegration(params: { templateId: string; userIntegrationId: string; orgId?: string }): Promise<boolean> { const { templateId, userIntegrationId, orgId } = params; if (!templateId || !userIntegrationId) return false; const client = await this.pool.connect(); try { // Copy the template documentation (identified by the org_id == 'template') to the user integration const result = await client.query('INSERT INTO integration_details (integration_id, org_id, documentation, open_api_schema) SELECT $1::text, $2::text, documentation, open_api_schema FROM integration_details WHERE integration_id = $3 AND org_id = $4', [userIntegrationId, orgId || '', templateId, 'template']); // return true, if we inserted at least one row return result.rowCount > 0; } finally { client.release(); } } // Tenant Information Methods async getTenantInfo(): Promise<{ email: string | null; emailEntrySkipped: boolean }> { const client = await this.pool.connect(); try { const result = await client.query('SELECT email, email_entry_skipped FROM tenant_info WHERE id = $1', ['default']); if (result.rows[0]) { return { email: result.rows[0].email, emailEntrySkipped: result.rows[0].email_entry_skipped }; } return { email: null, emailEntrySkipped: false }; } catch (error) { console.error('Error getting tenant info:', error); return { email: null, emailEntrySkipped: false }; } finally { client.release(); } } async setTenantInfo(params?: { email?: string; emailEntrySkipped?: boolean }): Promise<void> { const { email, emailEntrySkipped } = params || {}; const client = await this.pool.connect(); try { await client.query(` INSERT INTO tenant_info (id, email, email_entry_skipped, updated_at) VALUES ('default', $1, $2, CURRENT_TIMESTAMP) ON CONFLICT (id) DO UPDATE SET email = COALESCE($1, tenant_info.email), email_entry_skipped = COALESCE($2, tenant_info.email_entry_skipped), updated_at = CURRENT_TIMESTAMP `, [email, emailEntrySkipped]); } catch (error) { console.error('Error setting tenant info:', error); } finally { client.release(); } } // Utility methods async clearAll(orgId?: string): Promise<void> { const client = await this.pool.connect(); try { const condition = 'WHERE org_id = $1'; const param = [orgId || '']; await client.query(`DELETE FROM runs ${condition}`, param); await client.query(`DELETE FROM configurations ${condition}`, param); await client.query(`DELETE FROM workflow_schedules ${condition}`, param); await client.query(`DELETE FROM integration_details ${condition}`, param); // Delete details first await client.query(`DELETE FROM integrations ${condition}`, param); } finally { client.release(); } } async disconnect(): Promise<void> { await this.pool.end(); } async ping(): Promise<boolean> { try { const client = await this.pool.connect(); await client.query('SELECT 1'); client.release(); return true; } catch (error) { return false; } } async getTemplateOAuthCredentials(params: { templateId: string }): Promise<{ client_id: string; client_secret: string } | null> { const client = await this.pool.connect(); try { const result = await client.query( 'SELECT sg_client_id, sg_client_secret FROM integration_templates WHERE id = $1', [params.templateId] ); if (!result.rows[0]) return null; const decrypted = credentialEncryption.decrypt({ secret: result.rows[0].sg_client_secret }); return { client_id: result.rows[0].sg_client_id, client_secret: decrypted?.secret || '' }; } catch (error) { logMessage('debug', `No template OAuth credentials found for ${params.templateId}: ` + (error instanceof Error ? error.message : String(error))); return null; } finally { client.release(); } } async cacheOAuthSecret(params: { uid: string; clientId: string; clientSecret: string; ttlMs: number }): Promise<void> { const client = await this.pool.connect(); try { const expiresAt = new Date(Date.now() + params.ttlMs); const encrypted = credentialEncryption.encrypt({ secret: params.clientSecret }); const encryptedSecret = encrypted?.secret || params.clientSecret; await client.query( `INSERT INTO integration_oauth (uid, client_id, client_secret, expires_at) VALUES ($1, $2, $3, $4) ON CONFLICT (uid) DO UPDATE SET client_id = EXCLUDED.client_id, client_secret = EXCLUDED.client_secret, expires_at = EXCLUDED.expires_at`, [params.uid, params.clientId, encryptedSecret, expiresAt] ); } finally { client.release(); } } async getOAuthSecret(params: { uid: string }): Promise<{ clientId: string; clientSecret: string } | null> { const client = await this.pool.connect(); try { const result = await client.query( `SELECT client_id, client_secret, expires_at FROM integration_oauth WHERE uid = $1`, [params.uid] ); if (result.rows.length === 0) { return null; } const row = result.rows[0]; if (new Date(row.expires_at) <= new Date()) { await client.query('DELETE FROM integration_oauth WHERE uid = $1', [params.uid]); return null; } await client.query('DELETE FROM integration_oauth WHERE uid = $1', [params.uid]); const decrypted = credentialEncryption.decrypt({ secret: row.client_secret }); return { clientId: row.client_id, clientSecret: decrypted?.secret || '' }; } finally { client.release(); } } async createDiscoveryRun(params: { run: DiscoveryRun; orgId?: string }): Promise<DiscoveryRun> { const { run, orgId = '' } = params; const client = await this.pool.connect(); try { await client.query( `INSERT INTO discovery_runs (id, org_id, sources, data, status, created_at, updated_at) VALUES ($1, $2, $3, $4, $5, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)`, [run.id, orgId, JSON.stringify(run.sources), JSON.stringify(run.data), run.status] ); return run; } finally { client.release(); } } async getDiscoveryRun(params: { id: string; orgId?: string }): Promise<DiscoveryRun | null> { const { id, orgId = '' } = params; const client = await this.pool.connect(); try { const result = await client.query( `SELECT id, sources, data, status, created_at FROM discovery_runs WHERE id = $1 AND org_id = $2`, [id, orgId] ); if (result.rows.length === 0) return null; const row = result.rows[0]; return { id: row.id, sources: row.sources || [], data: row.data, status: row.status, createdAt: new Date(row.created_at) }; } finally { client.release(); } } async updateDiscoveryRun(params: { id: string; updates: Partial<DiscoveryRun>; orgId?: string }): Promise<DiscoveryRun> { const { id, updates, orgId = '' } = params; const client = await this.pool.connect(); try { const setClauses: string[] = []; const values: any[] = []; let paramCount = 1; if (updates.sources !== undefined) { setClauses.push(`sources = $${paramCount++}`); values.push(JSON.stringify(updates.sources)); } if (updates.data !== undefined) { setClauses.push(`data = $${paramCount++}`); values.push(JSON.stringify(updates.data)); } if (updates.status !== undefined) { setClauses.push(`status = $${paramCount++}`); values.push(updates.status); } setClauses.push(`updated_at = CURRENT_TIMESTAMP`); values.push(id, orgId); await client.query( `UPDATE discovery_runs SET ${setClauses.join(', ')} WHERE id = $${paramCount++} AND org_id = $${paramCount++}`, values ); const updated = await this.getDiscoveryRun({ id, orgId }); if (!updated) throw new Error('Failed to retrieve updated discovery run'); return updated; } finally { client.release(); } } async listDiscoveryRuns(params?: { limit?: number; offset?: number; orgId?: string }): Promise<{ items: DiscoveryRun[], total: number }> { const { limit = 10, offset = 0, orgId = '' } = params || {}; const client = await this.pool.connect(); try { const countResult = await client.query( `SELECT COUNT(*) FROM discovery_runs WHERE org_id = $1`, [orgId] ); const total = parseInt(countResult.rows[0].count); const result = await client.query( `SELECT id, org_id, sources, data, status, created_at FROM discovery_runs WHERE org_id = $1 ORDER BY created_at DESC LIMIT $2 OFFSET $3`, [orgId, limit, offset] ); const items = result.rows.map(row => ({ id: row.id, sources: row.sources || [], data: row.data, status: row.status, createdAt: new Date(row.created_at) })); return { items, total }; } finally { client.release(); } } async deleteDiscoveryRun(params: { id: string; orgId?: string }): Promise<boolean> { const { id, orgId = '' } = params; const client = await this.pool.connect(); try { const result = await client.query( `DELETE FROM discovery_runs WHERE id = $1 AND org_id = $2`, [id, orgId] ); return (result.rowCount || 0) > 0; } finally { client.release(); } } async createFileReference(params: { file: FileReference; orgId?: string }): Promise<FileReference> { const { file, orgId = '' } = params; const client = await this.pool.connect(); try { await client.query( `INSERT INTO file_references (id, org_id, storage_uri, processed_storage_uri, metadata, status, error, created_at, updated_at) VALUES ($1, $2, $3, $4, $5, $6, $7, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)`, [file.id, orgId, file.storageUri, file.processedStorageUri || null, JSON.stringify(file.metadata), file.status, file.error || null] ); // Fetch and return the complete record with database-generated fields const created = await this.getFileReference({ id: file.id, orgId }); if (!created) throw new Error('Failed to retrieve created file reference'); return created; } finally { client.release(); } } async getFileReference(params: { id: string; orgId?: string }): Promise<FileReference | null> { const { id, orgId = '' } = params; const client = await this.pool.connect(); try { const result = await client.query( `SELECT id, storage_uri, processed_storage_uri, metadata, status, error, created_at FROM file_references WHERE id = $1 AND org_id = $2`, [id, orgId] ); if (result.rows.length === 0) return null; const row = result.rows[0]; return { id: row.id, storageUri: row.storage_uri, processedStorageUri: row.processed_storage_uri || undefined, metadata: row.metadata, status: row.status, error: row.error || undefined, createdAt: row.created_at ? new Date(row.created_at) : undefined }; } finally { client.release(); } } async updateFileReference(params: { id: string; updates: Partial<FileReference>; orgId?: string }): Promise<FileReference> { const { id, updates, orgId = '' } = params; const client = await this.pool.connect(); try { const setClauses: string[] = []; const values: any[] = []; let paramCount = 1; if (updates.storageUri !== undefined) { setClauses.push(`storage_uri = $${paramCount++}`); values.push(updates.storageUri); } if (updates.processedStorageUri !== undefined) { setClauses.push(`processed_storage_uri = $${paramCount++}`); values.push(updates.processedStorageUri); } if (updates.metadata !== undefined) { setClauses.push(`metadata = $${paramCount++}`); values.push(JSON.stringify(updates.metadata)); } if (updates.status !== undefined) { setClauses.push(`status = $${paramCount++}`); values.push(updates.status); } if (updates.error !== undefined) { setClauses.push(`error = $${paramCount++}`); values.push(updates.error); } setClauses.push(`updated_at = CURRENT_TIMESTAMP`); values.push(id, orgId); await client.query( `UPDATE file_references SET ${setClauses.join(', ')} WHERE id = $${paramCount++} AND org_id = $${paramCount++}`, values ); const updated = await this.getFileReference({ id, orgId }); if (!updated) throw new Error('Failed to retrieve updated file reference'); return updated; } finally { client.release(); } } async listFileReferences(params?: { fileIds?: string[]; status?: FileStatus; limit?: number; offset?: number; orgId?: string }): Promise<{ items: FileReference[], total: number }> { const { fileIds, status, limit = 10, offset = 0, orgId = '' } = params || {}; const client = await this.pool.connect(); try { let whereClause = 'WHERE org_id = $1'; const queryParams: any[] = [orgId]; let paramCount = 2; if (fileIds && fileIds.length > 0) { whereClause += ` AND id = ANY($${paramCount++})`; queryParams.push(fileIds); } if (status) { whereClause += ` AND status = $${paramCount++}`; queryParams.push(status); } const countResult = await client.query( `SELECT COUNT(*) FROM file_references ${whereClause}`, queryParams ); const total = parseInt(countResult.rows[0].count); queryParams.push(limit, offset); const result = await client.query( `SELECT id, org_id, storage_uri, processed_storage_uri, metadata, status, error, created_at FROM file_references ${whereClause} ORDER BY created_at DESC LIMIT $${paramCount++} OFFSET $${paramCount++}`, queryParams ); const items = result.rows.map(row => ({ id: row.id, storageUri: row.storage_uri, processedStorageUri: row.processed_storage_uri || undefined, metadata: row.metadata, status: row.status, error: row.error || undefined, createdAt: row.created_at ? new Date(row.created_at) : undefined })); return { items, total }; } finally { client.release(); } } async deleteFileReference(params: { id: string; orgId?: string }): Promise<boolean> { const { id, orgId = '' } = params; const client = await this.pool.connect(); try { const result = await client.query( `DELETE FROM file_references WHERE id = $1 AND org_id = $2`, [id, orgId] ); return (result.rowCount || 0) > 0; } finally { client.release(); } } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/superglue-ai/superglue'

If you have feedback or need assistance with the MCP directory API, please join our Discord server