import { Integration, ServiceMetadata, Tool } from "@superglue/shared";
import { isMainThread, parentPort } from "worker_threads";
import { DataStore } from "../datastore/types.js";
import { DocumentationSearch } from "../documentation/documentation-search.js";
import { logMessage } from "../utils/logs.js";
import { isTokenExpired, refreshOAuthToken } from "../utils/oauth-token-refresh.js";
export interface DocumentationData {
content?: string;
openApiSchema?: string;
fetchedAt?: Date;
isFetched: boolean;
}
export class IntegrationManager {
// Core fields from Integration interface
// State management
id: string;
private _integration: Integration;
private _documentation: DocumentationData;
private dataStore: DataStore | null;
private metadata: ServiceMetadata;
private orgId: string; // Keep for backward compat
private _basicDataPromise?: Promise<Integration>;
private _documentationPromise?: Promise<DocumentationData>;
constructor(idOrIntegration: string | Integration, dataStore: DataStore | null, metadata: ServiceMetadata) {
this.dataStore = dataStore;
this.metadata = metadata;
this.orgId = metadata.orgId!; // Keep for backward compat
if (typeof idOrIntegration === 'string') {
// Initialize with just ID - will lazy load everything else
this.id = idOrIntegration;
this._documentation = {
isFetched: false
};
} else {
// Initialize with full integration object
this.id = idOrIntegration.id;
this._integration = idOrIntegration;
// Initialize documentation object
this._documentation = {
content: idOrIntegration.documentation,
openApiSchema: idOrIntegration.openApiSchema,
isFetched: !!idOrIntegration.documentation || !!idOrIntegration.openApiSchema,
fetchedAt: idOrIntegration.documentation ? new Date() : undefined
};
}
}
// Ensure basic data is loaded (without documentation)
async getIntegration(): Promise<Integration> {
if (this._integration) return this._integration;
// If datastore is null (e.g., in worker thread), return current state or throw
if (!this.dataStore) {
throw new Error(`Integration ${this.id} not initialized and datastore unavailable`);
}
// Prevent multiple simultaneous loads
if (this._basicDataPromise) {
await this._basicDataPromise;
return this._integration;
}
this._basicDataPromise = (async () => {
try {
const integration = await this.dataStore!.getIntegration({ id: this.id, includeDocs: false, orgId: this.orgId });
this._integration = integration;
return integration;
} finally {
this._basicDataPromise = undefined;
}
})();
await this._basicDataPromise;
return this._integration;
}
// Async method to ensure documentation is loaded
async getDocumentation(): Promise<DocumentationData> {
// If already fetched, return current state
if (this._documentation?.isFetched) {
return this._documentation;
}
// If datastore is null (e.g., in worker thread), return current state
if (!this.dataStore) {
return this._documentation || { isFetched: false };
}
try {
// Fetch the full integration with details from datastore
const fullIntegration = await this.dataStore.getIntegration({ id: this.id, includeDocs: true, orgId: this.orgId });
if (fullIntegration) {
this._documentation = {
content: fullIntegration.documentation,
openApiSchema: fullIntegration.openApiSchema,
isFetched: true,
fetchedAt: new Date()
};
this._integration = fullIntegration;
}
logMessage('info', `Documentation loaded for integration ${this.id}`, this.metadata);
} catch (error) {
logMessage('error', `Failed to load documentation for integration ${this.id}: ${error}`, this.metadata);
}
return this._documentation;
}
private searchCache = new Map<string, string>();
async searchDocumentation(instruction: string): Promise<string> {
if(this.searchCache.has(instruction)) {
return this.searchCache.get(instruction);
}
const documentation = await this.getDocumentation();
if(!documentation.openApiSchema && !documentation.content) {
return "no documentation provided";
}
const documentationSearch = new DocumentationSearch(this.metadata);
const result = documentationSearch.extractRelevantSections(documentation.content, instruction, 5, 4000, documentation.openApiSchema);
this.searchCache.set(instruction, result);
return result;
}
// Sync version of toIntegration for backward compatibility (may return incomplete data)
toIntegrationSync(): Integration {
return this._integration;
}
/**
* Refreshes the OAuth token if it's expired or about to expire
* @returns true if token was refreshed and saved, false otherwise
*/
async refreshTokenIfNeeded(): Promise<boolean> {
// Check if token needs refresh
await this.getIntegration();
if (!isTokenExpired(this._integration)) {
return false;
}
// Attempt to refresh the token
const refreshResult = await refreshOAuthToken(this._integration, this.metadata);
if (refreshResult.success) {
// update the credentials in the integration manager
this._integration.credentials = refreshResult.newCredentials;
if (!isMainThread && parentPort) {
parentPort.postMessage({
type: 'credential_update',
payload: {
integrationId: this.id,
orgId: this.orgId,
credentials: refreshResult.newCredentials
}
});
} else if (this.dataStore) {
await this.dataStore.upsertIntegration({
id: this.id,
integration: this._integration,
orgId: this.orgId
});
}
logMessage('info', `OAuth token refreshed for integration ${this.id}`, this.metadata);
} else {
logMessage('warn', `Failed to refresh OAuth token for integration ${this.id}`, this.metadata);
}
return refreshResult.success;
}
// Static factory method to create from Integration
static fromIntegration(integration: Integration, dataStore: DataStore, metadata: ServiceMetadata): IntegrationManager {
return new IntegrationManager(integration, dataStore, metadata);
}
// Static factory method to create from ID only
static async fromId(id: string, dataStore: DataStore, metadata: ServiceMetadata): Promise<IntegrationManager> {
const integration = await dataStore.getIntegration({ id, includeDocs: false, orgId: metadata.orgId! });
if (!integration) {
throw new Error(`Integration with id ${id} not found`);
}
return new IntegrationManager(integration, dataStore, metadata);
}
// Static method to create multiple instances
static fromIntegrations(integrations: Integration[], dataStore: DataStore, metadata: ServiceMetadata): IntegrationManager[] {
return integrations.map(i => IntegrationManager.fromIntegration(i, dataStore, metadata));
}
// Static method to create multiple instances from IDs
static async fromIds(ids: string[], dataStore: DataStore, metadata: ServiceMetadata): Promise<IntegrationManager[]> {
return Promise.all(ids.map(id => IntegrationManager.fromId(id, dataStore, metadata)));
}
static async forToolExecution(
tool: Tool,
dataStore: DataStore,
metadata: ServiceMetadata,
options: { includeDocs?: boolean } = {}
): Promise<IntegrationManager[]> {
const allIds = new Set<string>();
if (Array.isArray(tool.integrationIds)) {
tool.integrationIds.forEach(id => allIds.add(id));
}
if (Array.isArray(tool.steps)) {
tool.steps.forEach(step => {
if (step.integrationId) {
allIds.add(step.integrationId);
}
});
}
if (allIds.size === 0) {
return [];
}
const integrations = await dataStore.getManyIntegrations({
ids: Array.from(allIds),
includeDocs: options.includeDocs ?? false,
orgId: metadata.orgId
});
const managers = integrations.map(i => new IntegrationManager(i, dataStore, metadata));
for (const manager of managers) {
await manager.refreshTokenIfNeeded();
}
return managers;
}
}