Graph Ingest
graph_ingestQueue a document for background extraction into a memory graph, or check the ingest backlog. Use for files that don't need immediate reflection in conversation.
Instructions
Queue a document for asynchronous extraction into the memory graph (mode='queue'), or check the ingest backlog (mode='status'). Use this when you have a file the user wants summarized into the graph but doesn't need it reflected in the same conversation — the nightly dream process picks queued documents up. For inline assertions during a conversation, call graph_relate directly instead. Idempotent: queueing the same file twice overwrites the prior copy in the pending dir.
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| action | Yes | queue: add file to pending. status: check queue. | |
| file_path | No | Path to file to queue (required for queue action) | |
| meta | No | Optional metadata for the queued document |
Implementation Reference
- src/mcp-server/index.ts:450-527 (registration)Registration and handler for the graph_ingest tool. The tool supports two actions: 'queue' (copy a file into the ingest/pending directory with optional metadata) and 'status' (list pending and recently completed ingest files). The handler is self-contained with no calls to Neo4jClient.
server.registerTool("graph_ingest", { title: "Graph Ingest", description: "Queue a document for asynchronous extraction into the memory graph (mode='queue'), or check the ingest backlog (mode='status'). Use this when you have a file the user wants summarized into the graph but doesn't need it reflected in the same conversation — the nightly dream process picks queued documents up. For inline assertions during a conversation, call graph_relate directly instead. Idempotent: queueing the same file twice overwrites the prior copy in the pending dir.", inputSchema: { action: z.enum(["queue", "status"]).describe("queue: add file to pending. status: check queue."), file_path: z.string().optional().describe("Path to file to queue (required for queue action)"), meta: z.object({ source: z.string().optional(), author: z.string().optional(), date: z.string().optional(), topic_hints: z.array(z.string()).optional(), weight_override: z.number().optional(), }).optional().describe("Optional metadata for the queued document"), }, annotations: { idempotentHint: true }, }, async (args) => { try { const pendingDir = join(GRAPH_MEMORY_HOME, "ingest", "pending"); const completedDir = join(GRAPH_MEMORY_HOME, "ingest", "completed"); if (args.action === "status") { let pending: Array<{ file: string; queued_at: string; size: string }> = []; let recentlyCompleted: Array<{ file: string; processed_at: string }> = []; try { const pendingFiles = readdirSync(pendingDir).filter((f) => !f.endsWith(".meta.json")); pending = pendingFiles.map((f) => { const stat = statSync(join(pendingDir, f)); return { file: f, queued_at: stat.mtime.toISOString(), size: `${(stat.size / 1024).toFixed(1)} KB`, }; }); } catch { /* dir doesn't exist yet */ } try { const completedFiles = readdirSync(completedDir).filter((f) => !f.endsWith(".meta.json")); recentlyCompleted = completedFiles.slice(-5).map((f) => { const stat = statSync(join(completedDir, f)); return { file: f, processed_at: stat.mtime.toISOString() }; }); } catch { /* dir doesn't exist yet */ } return toolResult({ pending, recently_completed: recentlyCompleted, pending_count: pending.length, completed_count: recentlyCompleted.length, }); } // Queue action if (!args.file_path) { return toolError("file_path is required for queue action"); } mkdirSync(pendingDir, { recursive: true }); const destName = basename(args.file_path); const destPath = join(pendingDir, destName); copyFileSync(args.file_path, destPath); if (args.meta) { const metaPath = join(pendingDir, `${destName}.meta.json`); writeFileSync(metaPath, JSON.stringify(args.meta, null, 2)); } return toolResult({ action: "queued", file: destName, destination: destPath, meta_written: !!args.meta, }); } catch (err) { return toolError(`graph_ingest failed: ${err instanceof Error ? err.message : String(err)}`); } }); - src/mcp-server/index.ts:454-464 (schema)Input schema for graph_ingest. Accepts action (queue/status), optional file_path, and optional meta object with source, author, date, topic_hints, and weight_override.
inputSchema: { action: z.enum(["queue", "status"]).describe("queue: add file to pending. status: check queue."), file_path: z.string().optional().describe("Path to file to queue (required for queue action)"), meta: z.object({ source: z.string().optional(), author: z.string().optional(), date: z.string().optional(), topic_hints: z.array(z.string()).optional(), weight_override: z.number().optional(), }).optional().describe("Optional metadata for the queued document"), },