Skip to main content
Glama
orneryd

M.I.M.I.R - Multi-agent Intelligent Memory & Insight Repository

by orneryd
orchestration-api.ts49.4 kB
/** * @module api/orchestration-api * @description Multi-agent orchestration API with workflow execution * * Provides HTTP endpoints for managing multi-agent workflows with * PM → Worker → QC agent chains. Supports workflow execution, monitoring, * and real-time progress updates via Server-Sent Events (SSE). * * **Features:** * - Workflow execution from JSON definitions * - Real-time progress updates via SSE * - Agent preamble generation (Agentinator) * - Execution state persistence * - Multi-agent coordination * * **Endpoints:** * - `POST /api/orchestration/execute` - Execute a workflow * - `GET /api/orchestration/status/:executionId` - Get execution status * - `GET /api/orchestration/sse/:executionId` - SSE stream for updates * - `POST /api/orchestration/generate-preamble` - Generate agent preambles * * @example * ```typescript * // Execute a workflow * fetch('/api/orchestration/execute', { * method: 'POST', * headers: { 'Content-Type': 'application/json' }, * body: JSON.stringify({ * workflow: { * name: 'Feature Implementation', * agents: [/* agent configs //] * } * }) * }); * ``` */ import { Router, Request, Response } from 'express'; import type { IGraphManager } from '../types/index.js'; import { CopilotAgentClient } from '../orchestrator/llm-client.js'; import { promises as fs } from 'fs'; import path from 'path'; import { fileURLToPath } from 'url'; import neo4j from 'neo4j-driver'; // Import modular orchestration components import { sendSSEEvent, registerSSEClient, unregisterSSEClient, } from './orchestration/sse.js'; import { generatePreambleWithAgentinator } from './orchestration/agentinator.js'; import { executeWorkflowFromJSON, executionStates, type ExecutionState, type Deliverable } from './orchestration/workflow-executor.js'; import { validateLambdaScript } from '../orchestrator/lambda-executor.js'; import { handleVectorSearchNodes } from '../tools/vectorSearch.tools.js'; const __filename = fileURLToPath(import.meta.url); const __dirname = path.dirname(__filename); /** * Create Express router for orchestration API endpoints * * Provides HTTP endpoints for multi-agent orchestration, workflow execution, * and agent management. Includes endpoints for: * - Agent listing and search * - Workflow execution (PM → Workers → QC) * - Task management * - Agent preamble retrieval * - Vector search integration * * @param graphManager - Graph manager instance for Neo4j operations * @returns Configured Express router with all orchestration endpoints * * @example * ```ts * import express from 'express'; * import { GraphManager } from './managers/GraphManager.js'; * import { createOrchestrationRouter } from './api/orchestration-api.js'; * * const app = express(); * const graphManager = new GraphManager(driver); * * // Mount orchestration routes * app.use('/api', createOrchestrationRouter(graphManager)); * * // Available endpoints: * // GET /api/agents - List agents with search * // POST /api/execute-workflow - Execute multi-agent workflow * // GET /api/tasks/:id - Get task status * // POST /api/vector-search - Semantic search * ``` */ export function createOrchestrationRouter(graphManager: IGraphManager): Router { const router = Router(); /** * GET /api/agents - List agent preambles with search and pagination * * Retrieves agent preambles from the knowledge graph with optional text search. * Supports filtering by agent type (pm, worker, qc) and pagination. * * Query Parameters: * - search: Text search across name, role, and content * - limit: Maximum results to return (default: 20) * - offset: Number of results to skip (default: 0) * - type: Filter by agent type ('pm', 'worker', 'qc', 'all') * * @returns JSON with agents array, hasMore flag, and total count * * @example * // List all agents * fetch('/api/agents') * .then(res => res.json()) * .then(data => { * console.log('Found', data.agents.length, 'agents'); * data.agents.forEach(agent => { * console.log('-', agent.name, '(' + agent.agentType + ')'); * }); * }); * * @example * // Search for authentication-related agents * fetch('/api/agents?search=authentication&type=worker') * .then(res => res.json()) * .then(data => { * console.log('Auth workers:', data.agents); * }); * * @example * // Paginate through agents * async function loadAllAgents() { * let offset = 0; * const limit = 20; * const allAgents = []; * * while (true) { * const res = await fetch('/api/agents?limit=' + limit + '&offset=' + offset); * const data = await res.json(); * allAgents.push(...data.agents); * * if (!data.hasMore) break; * offset += limit; * } * * return allAgents; * } */ router.get('/agents', async (req: any, res: any) => { try { const { search, limit = 20, offset = 0, type = 'all' } = req.query; let agents: any[]; if (search && typeof search === 'string') { // Text-based search (case-insensitive) const driver = graphManager.getDriver(); const session = driver.session(); try { const searchLower = search.toLowerCase(); const limitInt = neo4j.int(Number(limit)); const offsetInt = neo4j.int(Number(offset)); const result = await session.run(` MATCH (n:Node) WHERE n.type = 'preamble' AND ($type = 'all' OR n.agentType = $type) AND ( toLower(n.name) CONTAINS $search OR toLower(n.role) CONTAINS $search OR toLower(n.content) CONTAINS $search ) RETURN n as node ORDER BY n.created DESC SKIP $offset LIMIT $limit `, { search: searchLower, limit: limitInt, offset: offsetInt, type }); agents = result.records.map((record: any) => { const props = record.get('node').properties; // Handle both old format (Neo4j label) and new format (Node properties) const agentType = props.agentType || props.agent_type || 'worker'; const roleDesc = props.roleDescription || props.role_description || props.role || ''; const name = props.name || roleDesc.split(' ').slice(0, 4).join(' ') || 'Unnamed Agent'; // Return only AgentTemplate fields for consistency with default agents return { id: props.id, name, role: roleDesc, agentType, content: props.content || '', version: props.version || '1.0', created: props.created || props.created_at, }; }); } finally { await session.close(); } } else { // Standard query without search - use direct Neo4j query to get full content const driver = graphManager.getDriver(); const session = driver.session(); try { const limitInt = neo4j.int(Number(limit)); const offsetInt = neo4j.int(Number(offset)); const result = await session.run(` MATCH (n:Node) WHERE n.type = 'preamble' AND ($type = 'all' OR n.agentType = $type) RETURN n as node ORDER BY n.created DESC SKIP $offset LIMIT $limit `, { limit: limitInt, offset: offsetInt, type }); agents = result.records.map((record: any) => { const props = record.get('node').properties; const agentType = props.agentType || props.agent_type || 'worker'; const roleDesc = props.roleDescription || props.role_description || props.role || ''; const name = props.name || roleDesc.split(' ').slice(0, 4).join(' ') || 'Unnamed Agent'; // Return only AgentTemplate fields for consistency with default agents return { id: props.id, name, role: roleDesc, agentType, content: props.content || '', version: props.version || '1.0', created: props.created || props.created_at, }; }); } finally { await session.close(); } } res.json({ agents, hasMore: agents.length === parseInt(limit as string), total: agents.length }); } catch (error) { console.error('Error fetching agents:', error); res.status(500).json({ error: 'Failed to fetch agents', details: error instanceof Error ? error.message : 'Unknown error' }); } }); /** * GET /api/agents/:id * Get specific agent preamble */ router.get('/agents/:id', async (req: any, res: any) => { try { const { id } = req.params; // Use direct Neo4j query to get full content (GraphManager strips large content) const driver = graphManager.getDriver(); const session = driver.session(); try { const result = await session.run(` MATCH (n:Node {id: $id}) WHERE n.type = 'preamble' RETURN n as node `, { id }); if (result.records.length === 0) { return res.status(404).json({ error: 'Agent not found' }); } const props = result.records[0].get('node').properties; // Return only AgentTemplate fields for consistency with default agents res.json({ id: props.id, name: props.name || 'Unnamed Agent', role: props.roleDescription || props.role || '', agentType: props.agentType || 'worker', content: props.content || '', version: props.version || '1.0', created: props.created || props.created_at, }); } finally { await session.close(); } } catch (error) { console.error('Error fetching agent:', error); res.status(500).json({ error: 'Failed to fetch agent', details: error instanceof Error ? error.message : 'Unknown error' }); } }); /** * POST /api/agents - Create new agent preamble using Agentinator * * Generates a specialized agent preamble from a role description. * Uses the Agentinator LLM to create contextual, task-specific instructions. * * Request Body: * - roleDescription: Description of agent's role and responsibilities (required) * - agentType: Type of agent ('worker', 'pm', 'qc') (default: 'worker') * - useAgentinator: Whether to use LLM generation (default: true) * * @returns JSON with created agent including id, name, role, content * * @example * // Create a worker agent for authentication * fetch('/api/agents', { * method: 'POST', * headers: { 'Content-Type': 'application/json' }, * body: JSON.stringify({ * roleDescription: 'Implement JWT-based authentication with refresh tokens', * agentType: 'worker', * useAgentinator: true * }) * }) * .then(res => res.json()) * .then(agent => { * console.log('Created agent:', agent.id); * console.log('Preamble length:', agent.content.length, 'chars'); * }); * * @example * // Create a QC agent for validation * const qcAgent = await fetch('/api/agents', { * method: 'POST', * headers: { 'Content-Type': 'application/json' }, * body: JSON.stringify({ * roleDescription: 'Validate API responses match OpenAPI spec', * agentType: 'qc' * }) * }).then(r => r.json()); * * console.log('QC Agent:', qcAgent.name); * * @example * // Create minimal agent without Agentinator * const simpleAgent = await fetch('/api/agents', { * method: 'POST', * headers: { 'Content-Type': 'application/json' }, * body: JSON.stringify({ * roleDescription: 'Simple task executor', * useAgentinator: false * }) * }).then(r => r.json()); */ router.post('/agents', async (req: any, res: any) => { try { const { roleDescription, agentType = 'worker', useAgentinator = true } = req.body; if (!roleDescription || typeof roleDescription !== 'string') { return res.status(400).json({ error: 'Role description is required' }); } let preambleContent = ''; let agentName = ''; let role = roleDescription; // Extract name from role description agentName = roleDescription.split(' ').slice(0, 4).join(' '); if (useAgentinator) { console.log(`🤖 Generating ${agentType} preamble with Agentinator...`); const generated = await generatePreambleWithAgentinator(roleDescription, agentType); agentName = generated.name; role = generated.role; preambleContent = generated.content; console.log(`✅ Generated preamble: ${agentName} (${preambleContent.length} chars)`); } else { // Create minimal preamble preambleContent = `# ${agentName} Agent\n\n` + `**Role:** ${roleDescription}\n\n` + `Execute tasks according to the role description above.\n`; } // Generate role hash for caching (MD5 of role description) const crypto = await import('crypto'); const roleHash = crypto.createHash('md5').update(roleDescription).digest('hex').substring(0, 8); // Store in Neo4j with full metadata const preambleNode = await graphManager.addNode('preamble', { name: agentName, role, agentType, content: preambleContent, version: '1.0', created: new Date().toISOString(), generatedBy: useAgentinator ? 'agentinator' : 'manual', roleDescription, roleHash, charCount: preambleContent.length, usedCount: 1, lastUsed: new Date().toISOString() }); res.json({ success: true, agent: { id: preambleNode.id, name: agentName, role, agentType, content: preambleContent, version: '1.0', created: preambleNode.created } }); } catch (error) { console.error('Error creating agent:', error); res.status(500).json({ error: 'Failed to create agent', details: error instanceof Error ? error.message : 'Unknown error' }); } }); /** * DELETE /api/agents/:id * Delete an agent preamble */ router.delete('/agents/:id', async (req: any, res: any) => { try { const { id } = req.params; console.log(`🗑️ DELETE request for agent: ${id}`); // Don't allow deleting default agents if (id.startsWith('default-')) { console.warn(`⚠️ Attempted to delete default agent: ${id}`); return res.status(403).json({ error: 'Cannot delete default agents' }); } // Check if agent exists first let agent: any; try { agent = await graphManager.getNode(id); } catch (getError: any) { console.error(`❌ Error getting agent ${id}:`, getError); return res.status(500).json({ error: 'Database error while checking agent', details: getError.message || 'Failed to query database' }); } if (!agent) { console.warn(`⚠️ Agent not found: ${id}`); return res.status(404).json({ error: 'Agent not found' }); } if (agent.type !== 'preamble') { console.warn(`⚠️ Node ${id} is not a preamble (type: ${agent.type})`); return res.status(404).json({ error: 'Agent not found' }); } // Delete the agent try { const deleted = await graphManager.deleteNode(id); if (!deleted) { console.warn(`⚠️ Agent ${id} was not deleted (returned false)`); return res.status(404).json({ error: 'Agent not found or already deleted' }); } console.log(`✅ Successfully deleted agent: ${id}`); res.json({ success: true }); } catch (deleteError: any) { console.error(`❌ Error deleting agent ${id}:`, deleteError); return res.status(500).json({ error: 'Database error while deleting agent', details: deleteError.message || 'Failed to delete from database' }); } } catch (error: any) { console.error('❌ Unexpected error deleting agent:', error); res.status(500).json({ error: 'Failed to delete agent', details: error.message || 'Unknown error' }); } }); /** * POST /api/generate-plan - Generate orchestration plan from project prompt * * Uses PM agent to analyze project requirements and generate a structured * task breakdown with agent assignments, dependencies, and deliverables. * * Request Body: * - prompt: Project description and requirements (required) * * @returns JSON with generated plan including tasks, agents, and workflow * * @example * // Generate plan for authentication system * fetch('/api/generate-plan', { * method: 'POST', * headers: { 'Content-Type': 'application/json' }, * body: JSON.stringify({ * prompt: 'Build a JWT authentication system with user registration, ' + * 'login, token refresh, and password reset functionality' * }) * }) * .then(res => res.json()) * .then(plan => { * console.log('Project:', plan.name); * console.log('Tasks:', plan.tasks.length); * plan.tasks.forEach(task => { * console.log('-', task.title, '(' + task.agentType + ')'); * }); * }); * * @example * // Generate plan with specific requirements * const plan = await fetch('/api/generate-plan', { * method: 'POST', * headers: { 'Content-Type': 'application/json' }, * body: JSON.stringify({ * prompt: `Create a REST API with: * - User CRUD operations * - PostgreSQL database * - OpenAPI documentation * - Unit and integration tests * - Docker deployment` * }) * }).then(r => r.json()); * * console.log('Generated', plan.tasks.length, 'tasks'); * console.log('Estimated duration:', plan.estimatedHours, 'hours'); */ router.post('/generate-plan', async (req: any, res: any) => { try { const { prompt } = req.body; if (!prompt || typeof prompt !== 'string') { return res.status(400).json({ error: 'Prompt is required' }); } console.log(`🔍 Performing semantic search on prompt: "${prompt.substring(0, 100)}..."`); // Perform vector search to get relevant context from Mimir let relevantContext = ''; let contextCount = 0; try { const searchResults = await handleVectorSearchNodes( { query: prompt, types: ['memory', 'orchestration_plan', 'orchestration_execution', 'file', 'concept'], // PM-relevant types limit: 8, min_similarity: 0.55 }, graphManager.getDriver() ); if (searchResults && searchResults.length > 0) { contextCount = searchResults.length; console.log(`✅ Found ${contextCount} relevant context items from knowledge base`); relevantContext = searchResults.map((result: any, idx: number) => { const type = result.type || 'unknown'; const title = result.title || result.id || 'Untitled'; const content = result.content || result.summary || ''; const truncated = content.substring(0, 1000); return `${idx + 1}. [${type.toUpperCase()}] ${title}\n ${truncated}${content.length > 1000 ? '...' : ''}`; }).join('\n\n'); } else { console.log('ℹ️ No relevant context found in knowledge base'); } } catch (searchError) { console.warn('⚠️ Vector search failed:', searchError); // Continue without context - don't fail the entire request } // Load PM agent preamble (JSON version) const pmPreamblePath = path.join(__dirname, '../../docs/agents/v2/01-pm-preamble-json.md'); const pmPreamble = await fs.readFile(pmPreamblePath, 'utf-8'); // Create PM agent client const pmAgent = new CopilotAgentClient({ preamblePath: pmPreamblePath, model: process.env.MIMIR_PM_MODEL || process.env.MIMIR_DEFAULT_MODEL || 'gpt-4.1', temperature: 0.2, // Lower temperature for structured output agentType: 'pm', }); // Load preamble await pmAgent.loadPreamble(pmPreamblePath); // Build user request with semantic context + repository context let userRequest = `${prompt}\n\n`; // Add relevant context from Mimir knowledge base if (relevantContext) { userRequest += `**RELEVANT CONTEXT FROM MIMIR KNOWLEDGE BASE:** (${contextCount} items retrieved via semantic search - use this to inform your task planning) ${relevantContext} --- `; } userRequest += `**REPOSITORY CONTEXT:** Project: Mimir - Graph-RAG TODO tracking with multi-agent orchestration Location: ${process.cwd()} **AVAILABLE TOOLS:** - read_file(path) - Read file contents - edit_file(path, content) - Create or modify files - run_terminal_cmd(command) - Execute shell commands - grep(pattern, path, options) - Search file contents - list_dir(path) - List directory contents - memory_node, memory_edge - Graph database operations **IMPORTANT:** Output ONLY valid JSON matching the ProjectPlan interface. No markdown, no explanations.`; console.log('🤖 Invoking PM Agent to generate task plan...'); if (contextCount > 0) { console.log(` 📚 Enriched with ${contextCount} context items from knowledge base`); } // Execute PM agent const result = await pmAgent.execute(userRequest); const response = result.output; // Parse JSON response let plan: any; try { // Extract JSON from response (in case there's any text before/after) const jsonMatch = response.match(/\{[\s\S]*\}/); if (!jsonMatch) { throw new Error('No JSON object found in PM agent response'); } plan = JSON.parse(jsonMatch[0]); // Validate required fields if (!plan.overview || !plan.tasks || !Array.isArray(plan.tasks)) { throw new Error('Invalid plan structure: missing required fields'); } console.log(`✅ PM Agent generated ${plan.tasks.length} tasks`); } catch (parseError) { console.error('Failed to parse PM agent response:', parseError); console.error('Raw response:', response.substring(0, 500)); // Return error with partial response for debugging return res.status(500).json({ error: 'Failed to parse PM agent response', details: parseError instanceof Error ? parseError.message : 'Invalid JSON', rawResponse: response.substring(0, 1000), }); } // Store the generated plan in Mimir for future reference await graphManager.addNode('memory', { type: 'orchestration_plan', title: `Plan: ${plan.overview.goal}`, content: JSON.stringify(plan, null, 2), prompt: prompt, category: 'orchestration', timestamp: new Date().toISOString(), taskCount: plan.tasks.length, }); res.json(plan); } catch (error) { console.error('Error generating plan:', error); res.status(500).json({ error: 'Failed to generate plan', details: error instanceof Error ? error.message : 'Unknown error', }); } }); /** * POST /api/save-plan - Save orchestration plan to knowledge graph * * Persists a generated task plan as a project node with task relationships. * Enables plan reuse, versioning, and historical tracking. * * Request Body: * - plan: Plan object with name, description, and tasks array * * @returns JSON with saved project node ID and task count * * @example * const plan = await fetch('/api/generate-plan', {}).then(r => r.json()); * const saved = await fetch('/api/save-plan', { * method: 'POST', * headers: { 'Content-Type': 'application/json' }, * body: JSON.stringify({ plan }) * }).then(r => r.json()); * console.log('Saved project:', saved.projectId); */ router.post('/save-plan', async (req: any, res: any) => { try { const { plan } = req.body; if (!plan) { return res.status(400).json({ error: 'Plan is required' }); } // Validate plan structure if (!Array.isArray(plan.tasks)) { return res.status(400).json({ error: 'Plan must contain a tasks array' }); } const tasks = plan.tasks as any[]; // Type-safe after validation // Create a project node const projectNode = await graphManager.addNode('project', { title: plan.overview.goal, complexity: plan.overview.complexity, totalTasks: plan.overview.totalTasks, estimatedDuration: plan.overview.estimatedDuration, estimatedToolCalls: plan.overview.estimatedToolCalls, reasoning: JSON.stringify(plan.reasoning), created: new Date().toISOString(), }); // Create task nodes and link to project const taskNodeIds: string[] = []; for (const task of tasks) { const taskNode = await graphManager.addNode('todo', { title: task.title, description: task.prompt, agentRole: task.agentRoleDescription, model: task.recommendedModel, status: 'pending', priority: 'medium', parallelGroup: task.parallelGroup, estimatedDuration: task.estimatedDuration, estimatedToolCalls: task.estimatedToolCalls, dependencies: JSON.stringify(task.dependencies), successCriteria: JSON.stringify(task.successCriteria), verificationCriteria: JSON.stringify(task.verificationCriteria), maxRetries: task.maxRetries, }); taskNodeIds.push(taskNode.id); // Link task to project await graphManager.addEdge(taskNode.id, projectNode.id, 'belongs_to', {}); } // Create dependency edges between tasks if (Array.isArray(tasks)) { for (let i = 0; i < tasks.length; i++) { const task = tasks[i]; const taskNodeId = taskNodeIds[i]; if (Array.isArray(task.dependencies)) { for (const depTaskId of task.dependencies) { const depIndex = tasks.findIndex((t: any) => t.id === depTaskId); if (depIndex !== -1) { await graphManager.addEdge(taskNodeId, taskNodeIds[depIndex], 'depends_on', {}); } } } } } res.json({ success: true, projectId: projectNode.id, taskIds: taskNodeIds, }); } catch (error) { console.error('Error saving plan:', error); res.status(500).json({ error: 'Failed to save plan', details: error instanceof Error ? error.message : 'Unknown error', }); } }); /** * GET /api/plans - List all saved orchestration plans * * Retrieves all project nodes from knowledge graph with task counts. * * @returns JSON array of saved plans with metadata * * @example * const plans = await fetch('/api/plans').then(r => r.json()); * plans.forEach(plan => { * console.log(plan.name, '-', plan.taskCount, 'tasks'); * }); */ router.get('/plans', async (req: any, res: any) => { try { const projects = await graphManager.queryNodes('project'); const plans = await Promise.all( projects.map(async (project) => { // Get all tasks linked to this project const neighbors = await graphManager.getNeighbors(project.id, 'belongs_to'); return { id: project.id, overview: { goal: project.properties?.title || 'Untitled', complexity: project.properties?.complexity || 'Medium', totalTasks: project.properties?.totalTasks || 0, estimatedDuration: project.properties?.estimatedDuration || 'TBD', estimatedToolCalls: project.properties?.estimatedToolCalls || 0, }, taskCount: neighbors.length, created: project.created, }; }) ); res.json({ plans }); } catch (error) { console.error('Error retrieving plans:', error); res.status(500).json({ error: 'Failed to retrieve plans', details: error instanceof Error ? error.message : 'Unknown error', }); } }); /** * GET /api/execution-stream/:executionId - Real-time execution progress via SSE * * Server-Sent Events stream providing live updates during workflow execution. * Emits events for task starts, completions, errors, and deliverables. * * @param executionId - Execution ID from execute-workflow response * @returns SSE stream with execution events * * @example * const eventSource = new EventSource('/api/execution-stream/exec-123'); * eventSource.onmessage = (e) => { * const event = JSON.parse(e.data); * console.log(event.type, event.message); * }; * eventSource.onerror = () => eventSource.close(); */ router.get('/execution-stream/:executionId', (req: any, res: any) => { const { executionId } = req.params; // Set SSE headers res.setHeader('Content-Type', 'text/event-stream'); res.setHeader('Cache-Control', 'no-cache'); res.setHeader('Connection', 'keep-alive'); res.setHeader('X-Accel-Buffering', 'no'); // Disable nginx buffering // Register this client for SSE updates registerSSEClient(executionId, res); // Send initial state if execution exists const state = executionStates.get(executionId); if (state) { res.write(`event: init\ndata: ${JSON.stringify({ status: state.status, taskStatuses: state.taskStatuses, currentTaskId: state.currentTaskId })}\n\n`); } else { res.write(`event: init\ndata: ${JSON.stringify({ status: 'pending' })}\n\n`); } // Handle client disconnect req.on('close', () => { unregisterSSEClient(executionId, res); }); }); /** * POST /api/cancel-execution/:executionId * Cancel a running workflow execution */ router.post('/cancel-execution/:executionId', (req: any, res: any) => { const { executionId } = req.params; const state = executionStates.get(executionId); if (!state) { return res.status(404).json({ error: 'Execution not found', executionId }); } if (state.status !== 'running') { return res.status(400).json({ error: `Cannot cancel execution with status: ${state.status}`, executionId, status: state.status }); } // Set cancellation flag state.cancelled = true; state.status = 'cancelled'; console.log(`⛔ Cancellation requested for execution ${executionId}`); // Emit cancellation event to SSE clients sendSSEEvent(executionId, 'execution-cancelled', { executionId, cancelledAt: Date.now(), message: 'Execution cancelled by user', }); res.json({ success: true, executionId, message: 'Execution cancellation requested', }); }); /** * GET /api/execution-state/:executionId * Get current execution state * * Returns the current state of a running or completed execution, * including status and task statuses. * * @since 1.0.0 */ router.get('/execution-state/:executionId', (req: any, res: any) => { const { executionId } = req.params; const state = executionStates.get(executionId); if (!state) { return res.status(404).json({ error: 'Execution not found', executionId }); } res.json({ executionId, status: state.status, taskStatuses: state.taskStatuses, currentTaskId: state.currentTaskId, startTime: state.startTime, endTime: state.endTime, cancelled: state.cancelled || false, }); }); /** * GET /api/execution-deliverable/:executionId/:filename * Download a specific deliverable file from memory */ router.get('/execution-deliverable/:executionId/:filename', (req: any, res: any) => { const { executionId, filename } = req.params; const state = executionStates.get(executionId); if (!state) { return res.status(404).json({ error: 'Execution not found', executionId }); } const deliverable = state.deliverables.find(d => d.filename === filename); if (!deliverable) { return res.status(404).json({ error: 'Deliverable not found', executionId, filename, availableFiles: state.deliverables.map(d => d.filename) }); } console.log(`📥 Serving deliverable: ${filename} (${deliverable.size} bytes)`); // Set headers for file download res.setHeader('Content-Type', deliverable.mimeType); res.setHeader('Content-Disposition', `attachment; filename="${deliverable.filename}"`); res.setHeader('Content-Length', deliverable.size); res.send(deliverable.content); }); /** * GET /api/execution-deliverables/:executionId * List all deliverables for an execution */ router.get('/execution-deliverables/:executionId', (req: any, res: any) => { const { executionId } = req.params; const state = executionStates.get(executionId); if (!state) { return res.status(404).json({ error: 'Execution not found', executionId }); } res.json({ executionId, status: state.status, deliverables: state.deliverables.map(d => ({ filename: d.filename, size: d.size, mimeType: d.mimeType, downloadUrl: `/api/execution-deliverable/${executionId}/${encodeURIComponent(d.filename)}`, })), }); }); /** * GET /api/executions/:executionId * Get all task executions and telemetry for an execution run */ router.get('/executions/:executionId', async (req: any, res: any) => { try { const { executionId } = req.params; const driver = graphManager.getDriver(); const session = driver.session(); try { // Get execution summary const summaryResult = await session.run(` MATCH (exec:Node {id: $executionId, type: 'orchestration_execution'}) RETURN exec `, { executionId }); // Get all task executions const tasksResult = await session.run(` MATCH (te:Node) WHERE te.type = 'task_execution' AND te.executionId = $executionId RETURN te ORDER BY te.timestamp `, { executionId }); const summary = summaryResult.records.length > 0 ? summaryResult.records[0].get('exec').properties : null; const tasks = tasksResult.records.map((record: any) => record.get('te').properties); // Build taskExecutions array with node IDs const taskExecutions = tasks.map((task: any) => ({ nodeId: task.id, // The unique task execution node ID taskId: task.taskId, taskTitle: task.taskTitle, status: task.status, duration: task.duration?.toNumber() || 0, tokens: { input: task.tokensInput?.toNumber() || 0, output: task.tokensOutput?.toNumber() || 0, total: task.tokensTotal?.toNumber() || 0, }, toolCalls: task.toolCalls?.toNumber() || 0, qcPassed: task.qcPassed || false, qcScore: task.qcScore?.toNumber() || 0, timestamp: task.timestamp, })); res.json({ executionId, summary, tasks, taskExecutions, // New field with node IDs totalTasks: tasks.length, totalTokens: summary ? { input: summary.tokensInput?.toNumber() || 0, output: summary.tokensOutput?.toNumber() || 0, total: summary.tokensTotal?.toNumber() || 0, } : null, }); } finally { await session.close(); } } catch (error) { console.error('Error fetching execution:', error); res.status(500).json({ error: 'Failed to fetch execution', details: error instanceof Error ? error.message : 'Unknown error', }); } }); /** * GET /api/deliverables/:executionId/download * Download all deliverables as a zip archive * * Returns a zip file containing all deliverable files from an execution. * * @since 1.0.0 */ router.get('/deliverables/:executionId/download', async (req: any, res: any) => { try { const { executionId } = req.params; const state = executionStates.get(executionId); if (!state) { return res.status(404).json({ error: 'Execution not found', executionId, }); } if (state.deliverables.length === 0) { return res.status(404).json({ error: 'No deliverables found for this execution', executionId, }); } // Dynamically import archiver const archiver = (await import('archiver')).default; const archive = archiver('zip', { zlib: { level: 9 }, // Maximum compression }); // Set response headers for zip download res.setHeader('Content-Type', 'application/zip'); res.setHeader('Content-Disposition', `attachment; filename="execution-${executionId}-deliverables.zip"`); // Pipe archive to response archive.pipe(res); // Add all deliverable files to the archive for (const deliverable of state.deliverables) { if (deliverable.content) { archive.append(deliverable.content, { name: deliverable.filename }); } } // Finalize the archive await archive.finalize(); console.log(`✅ Delivered zip archive with ${state.deliverables.length} files for execution ${executionId}`); } catch (error) { console.error('Error creating deliverables zip:', error); if (!res.headersSent) { res.status(500).json({ error: 'Failed to create deliverables archive', details: error instanceof Error ? error.message : 'Unknown error', }); } } }); /** * GET /api/deliverables/:executionId * Get execution deliverables with node ID metadata * * Returns all deliverable files from an execution along with metadata * including the execution node ID and all task execution node IDs. * * @since 1.0.0 */ router.get('/deliverables/:executionId', async (req: any, res: any) => { try { const { executionId } = req.params; const state = executionStates.get(executionId); if (!state) { return res.status(404).json({ error: 'Execution not found', message: `No execution found with ID: ${executionId}`, }); } // Extract task execution node IDs from results const taskExecutionIds = state.results .filter(r => r.graphNodeId) .map(r => r.graphNodeId as string); res.json({ executionId, status: state.status, taskExecutionIds, deliverables: state.deliverables.map(d => ({ filename: d.filename, size: d.size, mimeType: d.mimeType, })), totalDeliverables: state.deliverables.length, metadata: { startTime: state.startTime, endTime: state.endTime, duration: state.endTime ? state.endTime - state.startTime : null, totalTasks: Object.keys(state.taskStatuses).length, }, }); } catch (error) { console.error('Error fetching deliverables:', error); res.status(500).json({ error: 'Failed to fetch deliverables', details: error instanceof Error ? error.message : 'Unknown error', }); } }); /** * GET /api/executions * List all executions with summary data */ router.get('/executions', async (req: any, res: any) => { try { const { limit = '50', offset = '0' } = req.query; const driver = graphManager.getDriver(); const session = driver.session(); try { const result = await session.run(` MATCH (exec:Node {type: 'orchestration_execution'}) RETURN exec ORDER BY exec.startTime DESC SKIP $offset LIMIT $limit `, { offset: neo4j.int(parseInt(offset as string)), limit: neo4j.int(parseInt(limit as string)), }); const executions = result.records.map((record: any) => { const props = record.get('exec').properties; const executionId = props.id; // Merge Neo4j data with in-memory deliverables const memoryState = executionStates.get(executionId); const deliverables = memoryState?.deliverables || []; return { id: executionId, executionId: executionId, planId: props.planId, status: props.status, startTime: props.startTime, endTime: props.endTime, duration: props.duration?.toNumber() || 0, tasksTotal: props.tasksTotal?.toNumber() || 0, tasksSuccessful: props.tasksSuccessful?.toNumber() || 0, tasksFailed: props.tasksFailed?.toNumber() || 0, tokensTotal: props.tokensTotal?.toNumber() || 0, toolCalls: props.toolCalls?.toNumber() || 0, deliverables: deliverables.map(d => ({ filename: d.filename, size: d.size, mimeType: d.mimeType, downloadUrl: `/api/execution-deliverable/${executionId}/${encodeURIComponent(d.filename)}`, })), }; }); res.json({ executions, returned: executions.length, offset: parseInt(offset as string), limit: parseInt(limit as string), }); } finally { await session.close(); } } catch (error) { console.error('Error listing executions:', error); res.status(500).json({ error: 'Failed to list executions', details: error instanceof Error ? error.message : 'Unknown error', }); } }); /** * POST /api/execute-workflow - Execute multi-agent workflow from Task Canvas * * Starts asynchronous execution of a task workflow with parallel agent execution. * Each task is assigned to an agent (PM/Worker/QC) and executed with filtered context. * Progress can be monitored via SSE stream at /api/execution-stream/:executionId. * * Request Body: * - tasks: Array of task objects with agent assignments and dependencies (required) * * @returns JSON with executionId for tracking progress * * @example * // Execute a simple workflow * fetch('/api/execute-workflow', { * method: 'POST', * headers: { 'Content-Type': 'application/json' }, * body: JSON.stringify({ * tasks: [ * { * id: 'task-1', * title: 'Design API schema', * agentType: 'worker', * requirements: 'Create OpenAPI 3.0 spec for user API', * dependencies: [] * }, * { * id: 'task-2', * title: 'Implement endpoints', * agentType: 'worker', * requirements: 'Implement REST endpoints from spec', * dependencies: ['task-1'] * }, * { * id: 'task-3', * title: 'Validate implementation', * agentType: 'qc', * requirements: 'Verify endpoints match spec', * dependencies: ['task-2'] * } * ] * }) * }) * .then(res => res.json()) * .then(data => { * console.log('Execution started:', data.executionId); * // Connect to SSE stream for progress * const eventSource = new EventSource('/api/execution-stream/' + data.executionId); * eventSource.onmessage = (e) => { * const update = JSON.parse(e.data); * console.log('Progress:', update.message); * }; * }); * * @example * // Execute workflow with parallel tasks * const workflow = { * tasks: [ * { id: 't1', title: 'Task 1', agentType: 'worker', requirements: '...', dependencies: [] }, * { id: 't2', title: 'Task 2', agentType: 'worker', requirements: '...', dependencies: [] }, * { id: 't3', title: 'Task 3', agentType: 'worker', requirements: '...', dependencies: ['t1', 't2'] } * ] * }; * * const response = await fetch('/api/execute-workflow', { * method: 'POST', * headers: { 'Content-Type': 'application/json' }, * body: JSON.stringify(workflow) * }).then(r => r.json()); * * console.log('Execution ID:', response.executionId); * // t1 and t2 execute in parallel, t3 waits for both */ router.post('/execute-workflow', async (req: any, res: any) => { try { const { tasks } = req.body; if (!tasks || !Array.isArray(tasks) || tasks.length === 0) { return res.status(400).json({ error: 'Invalid workflow: tasks array is required' }); } console.log(`📥 Received workflow execution request with ${tasks.length} tasks`); // Generate execution ID const executionId = `exec-${Date.now()}`; // Start execution asynchronously (don't wait for completion) // No file system access needed - everything stored in Neo4j executeWorkflowFromJSON(tasks, executionId, graphManager).catch(error => { console.error(`❌ Workflow execution ${executionId} failed:`, error); }); res.json({ success: true, executionId, message: `Workflow execution started with ${tasks.length} tasks`, }); } catch (error) { console.error('Error starting workflow execution:', error); res.status(500).json({ error: 'Failed to start workflow execution', details: error instanceof Error ? error.message : 'Unknown error', }); } }); /** * Validate Lambda Script * * @route POST /api/validate-lambda * @group Lambda - Lambda script management * @param {string} script.body.required - Lambda script source code * @param {string} language.body.required - Script language (typescript, javascript, python) * @returns {object} 200 - Validation result * @returns {object} 400 - Invalid request * * @example * const response = await fetch('/api/validate-lambda', { * method: 'POST', * headers: { 'Content-Type': 'application/json' }, * body: JSON.stringify({ * script: 'function transform(inputs, ctx) { return inputs.join("\\n"); }', * language: 'javascript' * }) * }).then(r => r.json()); * * console.log('Valid:', response.valid); */ router.post('/validate-lambda', async (req: any, res: any) => { try { const { script, language } = req.body; if (!script || typeof script !== 'string') { return res.status(400).json({ valid: false, errors: ['Script is required and must be a string'] }); } if (!language || !['typescript', 'javascript', 'python'].includes(language)) { return res.status(400).json({ valid: false, errors: ['Language must be one of: typescript, javascript, python'] }); } console.log(`📝 Validating ${language} Lambda script (${script.length} chars)`); const result = validateLambdaScript(script, language); if (result.valid) { console.log(`✅ Lambda script validation passed`); res.json({ valid: true, message: 'Lambda script is valid', compiledCode: result.compiledCode, // Return compiled JS for TS scripts }); } else { console.log(`❌ Lambda script validation failed:`, result.errors); res.json({ valid: false, errors: result.errors, }); } } catch (error) { console.error('Error validating Lambda script:', error); res.status(500).json({ valid: false, errors: [error instanceof Error ? error.message : 'Unknown error during validation'], }); } }); return router; }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/orneryd/Mimir'

If you have feedback or need assistance with the MCP directory API, please join our Discord server