Skip to main content
Glama

atlas-mcp-server

importLogic.ts13.2 kB
/** * @fileoverview Implements the database import logic for Neo4j. * @module src/services/neo4j/backupRestoreService/importLogic */ import { existsSync, readdirSync, readFileSync } from "fs"; import { stat } from "fs/promises"; import { Session } from "neo4j-driver"; import path from "path"; import { logger, requestContextService } from "../../../utils/index.js"; import { neo4jDriver } from "../driver.js"; import { escapeRelationshipType } from "../helpers.js"; import { FullExport } from "./backupRestoreTypes.js"; import { secureResolve, validatedBackupRoot } from "./backupUtils.js"; /** * Imports data from JSON files, overwriting the existing database. * Can import from either full-export.json (if it exists) or individual entity files. * @param backupDirInput The path to the directory containing the backup JSON files. * @throws Error if any step fails or if the backup directory is invalid. */ export const _importDatabase = async ( backupDirInput: string, ): Promise<void> => { const backupDir = secureResolve( validatedBackupRoot, path.relative(validatedBackupRoot, path.resolve(backupDirInput)), ); if (!backupDir) { throw new Error( `Invalid backup directory provided: "${backupDirInput}". It must be within "${validatedBackupRoot}".`, ); } try { const stats = await stat(backupDir); if (!stats.isDirectory()) { throw new Error( `Backup path "${backupDir}" exists but is not a directory.`, ); } } catch (error: any) { if (error.code === "ENOENT") { throw new Error(`Backup directory "${backupDir}" does not exist.`); } throw new Error( `Failed to access backup directory "${backupDir}": ${error.message}`, ); } const operationName = "_importDatabase"; // Renamed const baseContext = requestContextService.createRequestContext({ operation: operationName, importDir: backupDir, }); let session: Session | null = null; logger.warning( `Starting database import from validated directory ${backupDir}. THIS WILL OVERWRITE ALL EXISTING DATA.`, baseContext, ); try { session = await neo4jDriver.getSession(); logger.info("Clearing existing database...", baseContext); await session.executeWrite(async (tx) => { logger.debug("Executing clear database transaction...", baseContext); await tx.run("MATCH (n) DETACH DELETE n"); logger.debug("Clear database transaction executed.", baseContext); }); logger.info("Existing database cleared.", baseContext); let relationships: Array<{ startNodeId: string; endNodeId: string; type: string; properties: Record<string, any>; }> = []; const fullExportPath = secureResolve(backupDir, "full-export.json"); if (fullExportPath && existsSync(fullExportPath)) { logger.info( `Found full-export.json at ${fullExportPath}. Using consolidated import.`, { ...baseContext, filePath: fullExportPath }, ); const fullExportContent = readFileSync(fullExportPath, "utf-8"); const fullExport: FullExport = JSON.parse(fullExportContent); for (const label in fullExport.nodes) { if (Object.prototype.hasOwnProperty.call(fullExport.nodes, label)) { const nodesToImport = fullExport.nodes[label]; if (!nodesToImport || nodesToImport.length === 0) { logger.info(`No ${label} nodes to import from full-export.json.`, { ...baseContext, label, }); continue; } logger.debug( `Importing ${nodesToImport.length} ${label} nodes from full-export.json`, { ...baseContext, label, count: nodesToImport.length }, ); const escapedLabel = `\`${label.replace(/`/g, "``")}\``; const query = `UNWIND $nodes as nodeProps CREATE (n:${escapedLabel}) SET n = nodeProps`; await session.executeWrite(async (tx) => { logger.debug( `Executing node creation transaction for label ${label} (full-export)...`, { ...baseContext, label }, ); await tx.run(query, { nodes: nodesToImport }); logger.debug( `Node creation transaction for label ${label} (full-export) executed.`, { ...baseContext, label }, ); }); logger.info( `Successfully imported ${nodesToImport.length} ${label} nodes from full-export.json`, { ...baseContext, label, count: nodesToImport.length }, ); } } if (fullExport.relationships && fullExport.relationships.length > 0) { logger.info( `Found ${fullExport.relationships.length} relationships in full-export.json.`, { ...baseContext, count: fullExport.relationships.length }, ); relationships = fullExport.relationships; } else { logger.info(`No relationships found in full-export.json.`, baseContext); } } else { logger.info( `No full-export.json found or path invalid. Using individual entity files from ${backupDir}.`, baseContext, ); const filesInBackupDir = readdirSync(backupDir); const nodeFiles = filesInBackupDir.filter( (file) => file.toLowerCase().endsWith(".json") && file !== "relationships.json" && file !== "full-export.json", ); for (const nodeFile of nodeFiles) { const filePath = secureResolve(backupDir, nodeFile); if (!filePath) { logger.warning( `Skipping potentially insecure node file path: ${nodeFile} in ${backupDir}`, { ...baseContext, nodeFile }, ); continue; } const inferredLabelFromFile = path.basename(nodeFile, ".json"); const label = inferredLabelFromFile.endsWith("s") ? inferredLabelFromFile.charAt(0).toUpperCase() + inferredLabelFromFile.slice(1, -1) : inferredLabelFromFile.charAt(0).toUpperCase() + inferredLabelFromFile.slice(1); if (!existsSync(filePath)) { logger.warning( `Node file ${nodeFile} (inferred label ${label}) not found at ${filePath}. Skipping.`, { ...baseContext, nodeFile, label, filePath }, ); continue; } logger.debug( `Importing nodes with inferred label: ${label} from ${filePath}`, { ...baseContext, label, filePath }, ); const fileContent = readFileSync(filePath, "utf-8"); const nodesToImport: Record<string, any>[] = JSON.parse(fileContent); if (nodesToImport.length === 0) { logger.info(`No ${label} nodes to import from ${filePath}.`, { ...baseContext, label, filePath, }); continue; } const escapedLabel = `\`${label.replace(/`/g, "``")}\``; const query = `UNWIND $nodes as nodeProps CREATE (n:${escapedLabel}) SET n = nodeProps`; await session.executeWrite(async (tx) => { logger.debug( `Executing node creation transaction for label ${label} (individual file)...`, { ...baseContext, label }, ); await tx.run(query, { nodes: nodesToImport }); logger.debug( `Node creation transaction for label ${label} (individual file) executed.`, { ...baseContext, label }, ); }); logger.info( `Successfully imported ${nodesToImport.length} ${label} nodes from ${filePath}`, { ...baseContext, label, count: nodesToImport.length, filePath }, ); } const relFilePath = secureResolve(backupDir, "relationships.json"); if (relFilePath && existsSync(relFilePath)) { logger.info(`Importing relationships from ${relFilePath}...`, { ...baseContext, filePath: relFilePath, }); const relFileContent = readFileSync(relFilePath, "utf-8"); relationships = JSON.parse(relFileContent); if (relationships.length === 0) { logger.info(`No relationships found to import in ${relFilePath}.`, { ...baseContext, filePath: relFilePath, }); } } else { logger.warning( `Relationships file not found or path invalid: ${relFilePath}. Skipping relationship import.`, { ...baseContext, filePath: relFilePath }, ); } } if (relationships.length > 0) { logger.info( `Attempting to import ${relationships.length} relationships...`, { ...baseContext, totalRelationships: relationships.length }, ); let importedCount = 0; let failedCount = 0; const relationshipsByType: Record< string, Array<{ startNodeId: string; endNodeId: string; properties: Record<string, any>; }> > = {}; for (const rel of relationships) { if (!rel.startNodeId || !rel.endNodeId || !rel.type) { logger.warning( `Skipping relationship due to missing critical data (startNodeId, endNodeId, or type): ${JSON.stringify(rel)}`, { ...baseContext, relationshipData: rel }, ); failedCount++; continue; } if (!relationshipsByType[rel.type]) { relationshipsByType[rel.type] = []; } relationshipsByType[rel.type].push({ startNodeId: rel.startNodeId, endNodeId: rel.endNodeId, properties: rel.properties || {}, }); } const batchSize = 500; for (const relType in relationshipsByType) { if ( Object.prototype.hasOwnProperty.call(relationshipsByType, relType) ) { const relsOfType = relationshipsByType[relType]; const escapedType = escapeRelationshipType(relType); logger.debug( `Processing ${relsOfType.length} relationships of type ${relType} (escaped: ${escapedType})`, { ...baseContext, relType, count: relsOfType.length }, ); for (let i = 0; i < relsOfType.length; i += batchSize) { const batch = relsOfType.slice(i, i + batchSize); const batchNumber = i / batchSize + 1; logger.debug( `Processing batch ${batchNumber} for type ${relType} (size: ${batch.length})`, { ...baseContext, relType, batchNumber, batchSize: batch.length }, ); const relQuery = ` UNWIND $rels AS relData MATCH (start {id: relData.startNodeId}) MATCH (end {id: relData.endNodeId}) CREATE (start)-[r:${escapedType}]->(end) SET r = relData.properties RETURN count(r) as createdCount `; try { const result = await session.executeWrite(async (tx) => { logger.debug( `Executing UNWIND transaction for type ${relType}, batch ${batchNumber}`, { ...baseContext, relType, batchNumber }, ); const txResult = await tx.run(relQuery, { rels: batch }); logger.debug( `UNWIND transaction executed for type ${relType}, batch ${batchNumber}`, { ...baseContext, relType, batchNumber }, ); return txResult.records[0]?.get("createdCount").toNumber() || 0; }); importedCount += result; logger.debug( `Successfully created ${result} relationships of type ${relType} in batch ${batchNumber}`, { ...baseContext, relType, batchNumber, count: result }, ); } catch (error) { const errorMsg = error instanceof Error ? error.message : String(error); logger.error( `Failed to create relationships of type ${relType} in batch ${batchNumber}: ${errorMsg}`, error as Error, { ...baseContext, relType, batchNumber, batchDataSample: batch.slice(0, 5), }, ); failedCount += batch.length; } } } } logger.info( `Relationship import summary: Attempted=${relationships.length}, Succeeded=${importedCount}, Failed=${failedCount}`, { ...baseContext, attempted: relationships.length, succeeded: importedCount, failed: failedCount, }, ); } else { logger.info("No relationships to import.", baseContext); } logger.info("Database import completed successfully.", baseContext); } catch (error) { const errorMessage = error instanceof Error ? error.message : String(error); logger.error( `Database import failed: ${errorMessage}`, error as Error, baseContext, ); throw new Error(`Database import failed: ${errorMessage}`); } finally { if (session) { await session.close(); } } };

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/cyanheads/atlas-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server