Skip to main content
Glama
index.tsโ€ข33.7 kB
#!/usr/bin/env node import { Server } from "@modelcontextprotocol/sdk/server/index.js"; import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js"; import { CallToolRequestSchema, ListToolsRequestSchema, ListResourcesRequestSchema, ReadResourceRequestSchema, ListPromptsRequestSchema, GetPromptRequestSchema, Tool, Resource, Prompt, } from "@modelcontextprotocol/sdk/types.js"; import axios, { AxiosInstance, AxiosError } from "axios"; // Conductor API client configuration interface ConductorConfig { baseUrl: string; apiPath?: string; } // Get configuration from environment variables const config: ConductorConfig = { baseUrl: process.env.CONDUCTOR_SERVER_URL || "http://localhost:8080", apiPath: process.env.CONDUCTOR_API_PATH || "/api", }; // Create axios instance for Conductor API const conductorClient: AxiosInstance = axios.create({ baseURL: `${config.baseUrl}${config.apiPath}`, headers: { "Content-Type": "application/json", }, timeout: 30000, }); // Helper function to format error messages with suggestions function formatError(error: any, toolName: string): string { if (axios.isAxiosError(error)) { const axiosError = error as AxiosError; const status = axiosError.response?.status || 500; const message = (axiosError.response?.data as any)?.message || axiosError.message || "Unknown error"; let suggestion = ""; // Provide helpful suggestions based on error type if (status === 404) { suggestion = "\n\n๐Ÿ’ก Suggestion: The resource was not found. Please verify the ID/name is correct."; } else if (status === 400) { suggestion = "\n\n๐Ÿ’ก Suggestion: Invalid request. Please check the input parameters."; } else if (status === 500) { if (message.includes("Elasticsearch")) { suggestion = "\n\n๐Ÿ’ก Suggestion: Elasticsearch is not configured or unavailable. Try using list_workflows instead of search_workflows."; } else { suggestion = "\n\n๐Ÿ’ก Suggestion: Server error. The Conductor server may be experiencing issues."; } } else if (status === 503) { suggestion = "\n\n๐Ÿ’ก Suggestion: Service unavailable. The Conductor server may be down or restarting."; } else if (axiosError.code === "ECONNREFUSED") { suggestion = `\n\n๐Ÿ’ก Suggestion: Cannot connect to Conductor server at ${config.baseUrl}. Please verify the server is running and the URL is correct.`; } else if (axiosError.code === "ETIMEDOUT") { suggestion = "\n\n๐Ÿ’ก Suggestion: Request timed out. The server may be slow or unresponsive."; } return `โŒ Error executing ${toolName}: [${status}] ${message}${suggestion}`; } return `โŒ Error executing ${toolName}: ${error.message || "Unknown error"}`; } // Helper function to validate workflow ID format function validateWorkflowId(workflowId: string): { valid: boolean; error?: string } { if (!workflowId || workflowId.trim() === "") { return { valid: false, error: "Workflow ID cannot be empty" }; } // Basic UUID validation (Conductor typically uses UUIDs) const uuidRegex = /^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/i; if (!uuidRegex.test(workflowId)) { return { valid: false, error: `Invalid workflow ID format. Expected UUID format, got: ${workflowId}` }; } return { valid: true }; } // Helper function to format timestamps function formatTimestamp(epochMs: number): string { if (!epochMs) return "N/A"; const date = new Date(epochMs); return date.toISOString(); } // Helper function to summarize workflow status function summarizeWorkflow(workflow: any): string { const duration = workflow.endTime ? workflow.endTime - workflow.startTime : Date.now() - workflow.startTime; const durationSec = Math.floor(duration / 1000); return `๐Ÿ“‹ Workflow: ${workflow.workflowName || workflow.workflowType} (v${workflow.version || workflow.workflowVersion}) ๐Ÿ†” ID: ${workflow.workflowId} ๐Ÿ“Š Status: ${workflow.status} โฑ๏ธ Started: ${formatTimestamp(workflow.startTime)} โณ Duration: ${durationSec}s ${workflow.status === "FAILED" ? `โŒ Failed Tasks: ${workflow.failedTaskNames?.join(", ") || "N/A"}` : ""}`; } // Define available resources const resources: Resource[] = [ { uri: "conductor://workflows/definitions", name: "Workflow Definitions", description: "List of all registered workflow definitions in Conductor", mimeType: "application/json", }, { uri: "conductor://tasks/definitions", name: "Task Definitions", description: "List of all registered task definitions in Conductor", mimeType: "application/json", }, { uri: "conductor://workflows/running", name: "Running Workflows", description: "List of currently running workflow executions", mimeType: "application/json", }, { uri: "conductor://workflows/failed", name: "Failed Workflows", description: "List of recently failed workflow executions", mimeType: "application/json", }, ]; // Define available prompts const prompts: Prompt[] = [ { name: "troubleshoot_workflow", description: "Troubleshoot a failed or stuck workflow", arguments: [ { name: "workflowId", description: "The workflow execution ID to troubleshoot", required: true, }, ], }, { name: "analyze_failures", description: "Analyze recent workflow failures and identify patterns", arguments: [ { name: "workflowName", description: "Optional: Filter by specific workflow name", required: false, }, { name: "hours", description: "Number of hours to look back (default: 24)", required: false, }, ], }, { name: "create_workflow", description: "Guide to create a new workflow definition", arguments: [ { name: "workflowName", description: "Name for the new workflow", required: true, }, { name: "description", description: "Description of what the workflow does", required: true, }, ], }, { name: "monitor_workflow", description: "Monitor a running workflow execution", arguments: [ { name: "workflowId", description: "The workflow execution ID to monitor", required: true, }, ], }, ]; // Define available tools const tools: Tool[] = [ { name: "list_workflows", description: "List workflow executions with optional filters. Returns a list of workflow executions matching the criteria.", inputSchema: { type: "object", properties: { workflowName: { type: "string", description: "Filter by workflow name/type", }, status: { type: "string", description: "Filter by workflow status (RUNNING, COMPLETED, FAILED, TIMED_OUT, TERMINATED, PAUSED)", enum: ["RUNNING", "COMPLETED", "FAILED", "TIMED_OUT", "TERMINATED", "PAUSED"], }, startTime: { type: "number", description: "Filter workflows started after this time (epoch milliseconds)", }, endTime: { type: "number", description: "Filter workflows started before this time (epoch milliseconds)", }, freeText: { type: "string", description: "Free text search across workflow executions", }, }, }, }, { name: "get_workflow_status", description: "Get the current status and details of a specific workflow execution by its ID. Returns complete workflow execution details including tasks, input/output, and current status.", inputSchema: { type: "object", properties: { workflowId: { type: "string", description: "The unique workflow execution ID", }, includeTaskDetails: { type: "boolean", description: "Include detailed task information (default: true)", }, }, required: ["workflowId"], }, }, { name: "start_workflow", description: "Start a new workflow execution. Returns the workflow execution ID of the newly started workflow.", inputSchema: { type: "object", properties: { workflowName: { type: "string", description: "Name of the workflow to start", }, version: { type: "number", description: "Version of the workflow (optional, defaults to latest)", }, input: { type: "object", description: "Input parameters for the workflow as a JSON object", }, correlationId: { type: "string", description: "Optional correlation ID for tracking", }, priority: { type: "number", description: "Workflow execution priority (0-99, default: 0)", }, }, required: ["workflowName"], }, }, { name: "pause_workflow", description: "Pause a running workflow execution. The workflow will pause and can be resumed later.", inputSchema: { type: "object", properties: { workflowId: { type: "string", description: "The workflow execution ID to pause", }, }, required: ["workflowId"], }, }, { name: "resume_workflow", description: "Resume a paused workflow execution. The workflow will continue from where it was paused.", inputSchema: { type: "object", properties: { workflowId: { type: "string", description: "The workflow execution ID to resume", }, }, required: ["workflowId"], }, }, { name: "terminate_workflow", description: "Terminate a workflow execution. This will stop the workflow and mark it as terminated.", inputSchema: { type: "object", properties: { workflowId: { type: "string", description: "The workflow execution ID to terminate", }, reason: { type: "string", description: "Reason for termination", }, }, required: ["workflowId"], }, }, { name: "restart_workflow", description: "Restart a workflow execution from the beginning. This creates a new execution with the same input.", inputSchema: { type: "object", properties: { workflowId: { type: "string", description: "The workflow execution ID to restart", }, useLatestDefinition: { type: "boolean", description: "Use the latest workflow definition (default: false)", }, }, required: ["workflowId"], }, }, { name: "retry_workflow", description: "Retry a failed workflow execution from the last failed task.", inputSchema: { type: "object", properties: { workflowId: { type: "string", description: "The workflow execution ID to retry", }, resumeSubworkflowTasks: { type: "boolean", description: "Resume subworkflow tasks (default: false)", }, }, required: ["workflowId"], }, }, { name: "search_workflows", description: "Advanced search for workflow executions using query syntax. Supports complex queries with multiple criteria.", inputSchema: { type: "object", properties: { query: { type: "string", description: "Query string (e.g., 'workflowType=MyWorkflow AND status=FAILED')", }, start: { type: "number", description: "Start index for pagination (default: 0)", }, size: { type: "number", description: "Number of results to return (default: 100)", }, sort: { type: "string", description: "Sort field and order (e.g., 'startTime:DESC')", }, }, required: ["query"], }, }, { name: "get_workflow_definition", description: "Get the definition of a workflow by name and version. Returns the complete workflow definition including all tasks and configuration.", inputSchema: { type: "object", properties: { workflowName: { type: "string", description: "Name of the workflow", }, version: { type: "number", description: "Version of the workflow (optional, defaults to latest)", }, }, required: ["workflowName"], }, }, { name: "list_workflow_definitions", description: "List all registered workflow definitions. Returns metadata about all workflows registered in Conductor.", inputSchema: { type: "object", properties: { access: { type: "string", description: "Filter by access type (READ or CREATE)", enum: ["READ", "CREATE"], }, tagKey: { type: "string", description: "Filter by tag key", }, tagValue: { type: "string", description: "Filter by tag value", }, }, }, }, { name: "create_workflow_definition", description: "Create or update a workflow definition. If the workflow already exists, it will be updated.", inputSchema: { type: "object", properties: { definition: { type: "object", description: "Complete workflow definition as a JSON object", }, overwrite: { type: "boolean", description: "Overwrite existing definition (default: true)", }, }, required: ["definition"], }, }, { name: "get_task_details", description: "Get details of a specific task execution by task ID. Returns task status, input/output, and execution details.", inputSchema: { type: "object", properties: { taskId: { type: "string", description: "The unique task execution ID", }, }, required: ["taskId"], }, }, { name: "get_task_logs", description: "Get execution logs for a specific task. Returns log entries generated during task execution.", inputSchema: { type: "object", properties: { taskId: { type: "string", description: "The unique task execution ID", }, }, required: ["taskId"], }, }, { name: "update_task_status", description: "Update the status of a task execution. This is typically used by workers to update task status.", inputSchema: { type: "object", properties: { taskId: { type: "string", description: "The unique task execution ID", }, workflowInstanceId: { type: "string", description: "The workflow instance ID", }, status: { type: "string", description: "New task status", enum: ["IN_PROGRESS", "FAILED", "FAILED_WITH_TERMINAL_ERROR", "COMPLETED"], }, output: { type: "object", description: "Task output data", }, logs: { type: "array", description: "Task execution logs", items: { type: "object", }, }, }, required: ["taskId", "workflowInstanceId", "status"], }, }, { name: "get_task_definition", description: "Get the definition of a task by name. Returns the task definition including configuration and metadata.", inputSchema: { type: "object", properties: { taskName: { type: "string", description: "Name of the task", }, }, required: ["taskName"], }, }, { name: "list_task_definitions", description: "List all registered task definitions. Returns metadata about all tasks registered in Conductor.", inputSchema: { type: "object", properties: { access: { type: "string", description: "Filter by access type (READ or CREATE)", enum: ["READ", "CREATE"], }, }, }, }, { name: "create_task_definition", description: "Create or update a task definition. If the task already exists, it will be updated.", inputSchema: { type: "object", properties: { definition: { type: "object", description: "Complete task definition as a JSON object", }, }, required: ["definition"], }, }, { name: "get_event_handlers", description: "Get all event handlers or filter by event and active status. Event handlers define how Conductor responds to external events.", inputSchema: { type: "object", properties: { event: { type: "string", description: "Filter by event name", }, activeOnly: { type: "boolean", description: "Return only active event handlers (default: true)", }, }, }, }, ]; // Create MCP server const server = new Server( { name: "conductor-mcp", version: "1.0.0", }, { capabilities: { tools: {}, resources: {}, prompts: {}, }, } ); // Handle list tools request server.setRequestHandler(ListToolsRequestSchema, async () => { return { tools, }; }); // Handle list resources request server.setRequestHandler(ListResourcesRequestSchema, async () => { return { resources, }; }); // Handle read resource request server.setRequestHandler(ReadResourceRequestSchema, async (request) => { const { uri } = request.params; try { switch (uri) { case "conductor://workflows/definitions": { const response = await conductorClient.get("/metadata/workflow"); return { contents: [ { uri, mimeType: "application/json", text: JSON.stringify(response.data, null, 2), }, ], }; } case "conductor://tasks/definitions": { const response = await conductorClient.get("/metadata/taskdefs"); return { contents: [ { uri, mimeType: "application/json", text: JSON.stringify(response.data, null, 2), }, ], }; } case "conductor://workflows/running": { const response = await conductorClient.get("/workflow/search", { params: { status: "RUNNING", start: 0, size: 100 }, }); return { contents: [ { uri, mimeType: "application/json", text: JSON.stringify(response.data, null, 2), }, ], }; } case "conductor://workflows/failed": { const response = await conductorClient.get("/workflow/search", { params: { status: "FAILED", start: 0, size: 100 }, }); return { contents: [ { uri, mimeType: "application/json", text: JSON.stringify(response.data, null, 2), }, ], }; } default: throw new Error(`Unknown resource: ${uri}`); } } catch (error: any) { throw new Error(formatError(error, `read resource ${uri}`)); } }); // Handle list prompts request server.setRequestHandler(ListPromptsRequestSchema, async () => { return { prompts, }; }); // Handle get prompt request server.setRequestHandler(GetPromptRequestSchema, async (request) => { const { name, arguments: args = {} } = request.params; switch (name) { case "troubleshoot_workflow": { const workflowId = (args as any).workflowId; return { messages: [ { role: "user", content: { type: "text", text: `I need help troubleshooting workflow ${workflowId}. Please: 1. Get the current status of the workflow 2. Identify any failed tasks 3. Check the task logs for errors 4. Suggest possible solutions based on the error messages 5. Recommend next steps to resolve the issue`, }, }, ], }; } case "analyze_failures": { const workflowName = (args as any).workflowName; const hours = (args as any).hours || 24; const startTime = Date.now() - hours * 60 * 60 * 1000; let query = `List all failed workflows in the last ${hours} hours`; if (workflowName) { query += ` for workflow type "${workflowName}"`; } query += `. Then analyze the failures to identify: 1. Common error patterns 2. Most frequently failing tasks 3. Potential root causes 4. Recommendations to prevent future failures`; return { messages: [ { role: "user", content: { type: "text", text: query, }, }, ], }; } case "create_workflow": { const workflowName = (args as any).workflowName; const description = (args as any).description; return { messages: [ { role: "user", content: { type: "text", text: `I want to create a new workflow called "${workflowName}". Description: ${description} Please help me: 1. Show me existing workflow definitions as examples 2. Guide me through creating the workflow definition JSON 3. Help me define the necessary tasks 4. Create the task definitions if needed 5. Register the workflow definition in Conductor`, }, }, ], }; } case "monitor_workflow": { const workflowId = (args as any).workflowId; return { messages: [ { role: "user", content: { type: "text", text: `Please monitor workflow ${workflowId} and provide: 1. Current status and progress 2. List of completed tasks 3. Currently running tasks 4. Pending tasks 5. Any warnings or issues 6. Estimated time to completion (if possible)`, }, }, ], }; } default: throw new Error(`Unknown prompt: ${name}`); } }); // Handle tool execution server.setRequestHandler(CallToolRequestSchema, async (request) => { const { name, arguments: args = {} } = request.params; try { switch (name) { case "list_workflows": { const params: any = { start: 0, size: 100, }; if ((args as any).workflowName) params.workflowType = (args as any).workflowName; if ((args as any).status) params.status = (args as any).status; if ((args as any).startTime) params.startTime = (args as any).startTime; if ((args as any).endTime) params.endTime = (args as any).endTime; if ((args as any).freeText) params.freeText = (args as any).freeText; const response = await conductorClient.get("/workflow/search", { params }); return { content: [ { type: "text", text: JSON.stringify(response.data, null, 2), }, ], }; } case "get_workflow_status": { const { workflowId, includeTaskDetails = true } = args as any; // Validate workflow ID const validation = validateWorkflowId(workflowId); if (!validation.valid) { return { content: [ { type: "text", text: `โŒ Validation Error: ${validation.error}`, }, ], isError: true, }; } const url = `/workflow/${workflowId}`; const params = includeTaskDetails ? { includeTasks: true } : {}; const response = await conductorClient.get(url, { params }); // Add summary for better readability const summary = summarizeWorkflow(response.data); return { content: [ { type: "text", text: `${summary}\n\n๐Ÿ“„ Full Details:\n${JSON.stringify(response.data, null, 2)}`, }, ], }; } case "start_workflow": { const { workflowName, version, input = {}, correlationId, priority = 0 } = args as any; // Validate workflow name if (!workflowName || workflowName.trim() === "") { return { content: [ { type: "text", text: "โŒ Validation Error: Workflow name is required", }, ], isError: true, }; } const requestBody: any = { name: workflowName, input, priority, }; if (version) requestBody.version = version; if (correlationId) requestBody.correlationId = correlationId; const response = await conductorClient.post("/workflow", requestBody); return { content: [ { type: "text", text: `โœ… Workflow started successfully!\n\n๐Ÿ†” Workflow ID: ${response.data}\n๐Ÿ“‹ Workflow Name: ${workflowName}\n${version ? `๐Ÿ“Œ Version: ${version}\n` : ""}${correlationId ? `๐Ÿ”— Correlation ID: ${correlationId}\n` : ""}\n๐Ÿ’ก Tip: Use get_workflow_status with this ID to monitor progress.`, }, ], }; } case "pause_workflow": { const { workflowId } = args as any; await conductorClient.put(`/workflow/${workflowId}/pause`); return { content: [ { type: "text", text: `Workflow ${workflowId} paused successfully.`, }, ], }; } case "resume_workflow": { const { workflowId } = args as any; await conductorClient.put(`/workflow/${workflowId}/resume`); return { content: [ { type: "text", text: `Workflow ${workflowId} resumed successfully.`, }, ], }; } case "terminate_workflow": { const { workflowId, reason = "Terminated via MCP" } = args as any; await conductorClient.delete(`/workflow/${workflowId}`, { params: { reason }, }); return { content: [ { type: "text", text: `Workflow ${workflowId} terminated successfully. Reason: ${reason}`, }, ], }; } case "restart_workflow": { const { workflowId, useLatestDefinition = false } = args as any; const response = await conductorClient.post( `/workflow/${workflowId}/restart`, null, { params: { useLatestDefinitions: useLatestDefinition }, } ); return { content: [ { type: "text", text: `Workflow restarted successfully. New Workflow ID: ${response.data}`, }, ], }; } case "retry_workflow": { const { workflowId, resumeSubworkflowTasks = false } = args as any; await conductorClient.post(`/workflow/${workflowId}/retry`, null, { params: { resumeSubworkflowTasks }, }); return { content: [ { type: "text", text: `Workflow ${workflowId} retry initiated successfully.`, }, ], }; } case "search_workflows": { const { query, start = 0, size = 100, sort } = args as any; const params: any = { start, size, query, }; if (sort) params.sort = sort; const response = await conductorClient.get("/workflow/search-v2", { params }); return { content: [ { type: "text", text: JSON.stringify(response.data, null, 2), }, ], }; } case "get_workflow_definition": { const { workflowName, version } = args as any; const url = version ? `/metadata/workflow/${workflowName}/${version}` : `/metadata/workflow/${workflowName}`; const response = await conductorClient.get(url); return { content: [ { type: "text", text: JSON.stringify(response.data, null, 2), }, ], }; } case "list_workflow_definitions": { const params: any = {}; if ((args as any).access) params.access = (args as any).access; if ((args as any).tagKey) params.tagKey = (args as any).tagKey; if ((args as any).tagValue) params.tagValue = (args as any).tagValue; const response = await conductorClient.get("/metadata/workflow", { params }); return { content: [ { type: "text", text: JSON.stringify(response.data, null, 2), }, ], }; } case "create_workflow_definition": { const { definition, overwrite = true } = args as any; const response = await conductorClient.post( `/metadata/workflow`, definition, { params: { overwrite }, } ); return { content: [ { type: "text", text: `Workflow definition created/updated successfully.`, }, ], }; } case "get_task_details": { const { taskId } = args as any; const response = await conductorClient.get(`/tasks/${taskId}`); return { content: [ { type: "text", text: JSON.stringify(response.data, null, 2), }, ], }; } case "get_task_logs": { const { taskId } = args as any; const response = await conductorClient.get(`/tasks/${taskId}/log`); return { content: [ { type: "text", text: JSON.stringify(response.data, null, 2), }, ], }; } case "update_task_status": { const { taskId, workflowInstanceId, status, output = {}, logs = [] } = args as any; const taskUpdate = { workflowInstanceId, taskId, status, outputData: output, logs, }; await conductorClient.post("/tasks", taskUpdate); return { content: [ { type: "text", text: `Task ${taskId} status updated to ${status} successfully.`, }, ], }; } case "get_task_definition": { const { taskName } = args as any; const response = await conductorClient.get(`/metadata/taskdefs/${taskName}`); return { content: [ { type: "text", text: JSON.stringify(response.data, null, 2), }, ], }; } case "list_task_definitions": { const params: any = {}; if ((args as any).access) params.access = (args as any).access; const response = await conductorClient.get("/metadata/taskdefs", { params }); return { content: [ { type: "text", text: JSON.stringify(response.data, null, 2), }, ], }; } case "create_task_definition": { const { definition } = args as any; await conductorClient.post("/metadata/taskdefs", [definition]); return { content: [ { type: "text", text: `Task definition created/updated successfully.`, }, ], }; } case "get_event_handlers": { const params: any = {}; if ((args as any).event) params.event = (args as any).event; if ((args as any).activeOnly !== undefined) params.activeOnly = (args as any).activeOnly; else params.activeOnly = true; const response = await conductorClient.get("/event", { params }); return { content: [ { type: "text", text: JSON.stringify(response.data, null, 2), }, ], }; } default: return { content: [ { type: "text", text: `โŒ Unknown tool: ${name}\n\n๐Ÿ’ก Suggestion: Use list_tools to see available tools.`, }, ], isError: true, }; } } catch (error: any) { return { content: [ { type: "text", text: formatError(error, name), }, ], isError: true, }; } }); // Start the server async function main() { const transport = new StdioServerTransport(); await server.connect(transport); console.error("Conductor MCP Server running on stdio"); console.error(`Connected to Conductor server at: ${config.baseUrl}${config.apiPath}`); } main().catch((error) => { console.error("Fatal error in main():", error); process.exit(1); });

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/opensensor/conductor-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server