#!/usr/bin/env node
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import { CallToolRequestSchema, ListToolsRequestSchema, } from "@modelcontextprotocol/sdk/types.js";
import axios from "axios";
// Get configuration from environment variables
const config = {
baseUrl: process.env.CONDUCTOR_SERVER_URL || "http://localhost:8080",
apiPath: process.env.CONDUCTOR_API_PATH || "/api",
};
// Create axios instance for Conductor API
const conductorClient = axios.create({
baseURL: `${config.baseUrl}${config.apiPath}`,
headers: {
"Content-Type": "application/json",
},
timeout: 30000,
});
// Define available tools
const tools = [
{
name: "list_workflows",
description: "List workflow executions with optional filters. Returns a list of workflow executions matching the criteria.",
inputSchema: {
type: "object",
properties: {
workflowName: {
type: "string",
description: "Filter by workflow name/type",
},
status: {
type: "string",
description: "Filter by workflow status (RUNNING, COMPLETED, FAILED, TIMED_OUT, TERMINATED, PAUSED)",
enum: ["RUNNING", "COMPLETED", "FAILED", "TIMED_OUT", "TERMINATED", "PAUSED"],
},
startTime: {
type: "number",
description: "Filter workflows started after this time (epoch milliseconds)",
},
endTime: {
type: "number",
description: "Filter workflows started before this time (epoch milliseconds)",
},
freeText: {
type: "string",
description: "Free text search across workflow executions",
},
},
},
},
{
name: "get_workflow_status",
description: "Get the current status and details of a specific workflow execution by its ID. Returns complete workflow execution details including tasks, input/output, and current status.",
inputSchema: {
type: "object",
properties: {
workflowId: {
type: "string",
description: "The unique workflow execution ID",
},
includeTaskDetails: {
type: "boolean",
description: "Include detailed task information (default: true)",
},
},
required: ["workflowId"],
},
},
{
name: "start_workflow",
description: "Start a new workflow execution. Returns the workflow execution ID of the newly started workflow.",
inputSchema: {
type: "object",
properties: {
workflowName: {
type: "string",
description: "Name of the workflow to start",
},
version: {
type: "number",
description: "Version of the workflow (optional, defaults to latest)",
},
input: {
type: "object",
description: "Input parameters for the workflow as a JSON object",
},
correlationId: {
type: "string",
description: "Optional correlation ID for tracking",
},
priority: {
type: "number",
description: "Workflow execution priority (0-99, default: 0)",
},
},
required: ["workflowName"],
},
},
{
name: "pause_workflow",
description: "Pause a running workflow execution. The workflow will pause and can be resumed later.",
inputSchema: {
type: "object",
properties: {
workflowId: {
type: "string",
description: "The workflow execution ID to pause",
},
},
required: ["workflowId"],
},
},
{
name: "resume_workflow",
description: "Resume a paused workflow execution. The workflow will continue from where it was paused.",
inputSchema: {
type: "object",
properties: {
workflowId: {
type: "string",
description: "The workflow execution ID to resume",
},
},
required: ["workflowId"],
},
},
{
name: "terminate_workflow",
description: "Terminate a workflow execution. This will stop the workflow and mark it as terminated.",
inputSchema: {
type: "object",
properties: {
workflowId: {
type: "string",
description: "The workflow execution ID to terminate",
},
reason: {
type: "string",
description: "Reason for termination",
},
},
required: ["workflowId"],
},
},
{
name: "restart_workflow",
description: "Restart a workflow execution from the beginning. This creates a new execution with the same input.",
inputSchema: {
type: "object",
properties: {
workflowId: {
type: "string",
description: "The workflow execution ID to restart",
},
useLatestDefinition: {
type: "boolean",
description: "Use the latest workflow definition (default: false)",
},
},
required: ["workflowId"],
},
},
{
name: "retry_workflow",
description: "Retry a failed workflow execution from the last failed task.",
inputSchema: {
type: "object",
properties: {
workflowId: {
type: "string",
description: "The workflow execution ID to retry",
},
resumeSubworkflowTasks: {
type: "boolean",
description: "Resume subworkflow tasks (default: false)",
},
},
required: ["workflowId"],
},
},
{
name: "search_workflows",
description: "Advanced search for workflow executions using query syntax. Supports complex queries with multiple criteria.",
inputSchema: {
type: "object",
properties: {
query: {
type: "string",
description: "Query string (e.g., 'workflowType=MyWorkflow AND status=FAILED')",
},
start: {
type: "number",
description: "Start index for pagination (default: 0)",
},
size: {
type: "number",
description: "Number of results to return (default: 100)",
},
sort: {
type: "string",
description: "Sort field and order (e.g., 'startTime:DESC')",
},
},
required: ["query"],
},
},
{
name: "get_workflow_definition",
description: "Get the definition of a workflow by name and version. Returns the complete workflow definition including all tasks and configuration.",
inputSchema: {
type: "object",
properties: {
workflowName: {
type: "string",
description: "Name of the workflow",
},
version: {
type: "number",
description: "Version of the workflow (optional, defaults to latest)",
},
},
required: ["workflowName"],
},
},
{
name: "list_workflow_definitions",
description: "List all registered workflow definitions. Returns metadata about all workflows registered in Conductor.",
inputSchema: {
type: "object",
properties: {
access: {
type: "string",
description: "Filter by access type (READ or CREATE)",
enum: ["READ", "CREATE"],
},
tagKey: {
type: "string",
description: "Filter by tag key",
},
tagValue: {
type: "string",
description: "Filter by tag value",
},
},
},
},
{
name: "create_workflow_definition",
description: "Create or update a workflow definition. If the workflow already exists, it will be updated.",
inputSchema: {
type: "object",
properties: {
definition: {
type: "object",
description: "Complete workflow definition as a JSON object",
},
overwrite: {
type: "boolean",
description: "Overwrite existing definition (default: true)",
},
},
required: ["definition"],
},
},
{
name: "get_task_details",
description: "Get details of a specific task execution by task ID. Returns task status, input/output, and execution details.",
inputSchema: {
type: "object",
properties: {
taskId: {
type: "string",
description: "The unique task execution ID",
},
},
required: ["taskId"],
},
},
{
name: "get_task_logs",
description: "Get execution logs for a specific task. Returns log entries generated during task execution.",
inputSchema: {
type: "object",
properties: {
taskId: {
type: "string",
description: "The unique task execution ID",
},
},
required: ["taskId"],
},
},
{
name: "update_task_status",
description: "Update the status of a task execution. This is typically used by workers to update task status.",
inputSchema: {
type: "object",
properties: {
taskId: {
type: "string",
description: "The unique task execution ID",
},
workflowInstanceId: {
type: "string",
description: "The workflow instance ID",
},
status: {
type: "string",
description: "New task status",
enum: ["IN_PROGRESS", "FAILED", "FAILED_WITH_TERMINAL_ERROR", "COMPLETED"],
},
output: {
type: "object",
description: "Task output data",
},
logs: {
type: "array",
description: "Task execution logs",
items: {
type: "object",
},
},
},
required: ["taskId", "workflowInstanceId", "status"],
},
},
{
name: "get_task_definition",
description: "Get the definition of a task by name. Returns the task definition including configuration and metadata.",
inputSchema: {
type: "object",
properties: {
taskName: {
type: "string",
description: "Name of the task",
},
},
required: ["taskName"],
},
},
{
name: "list_task_definitions",
description: "List all registered task definitions. Returns metadata about all tasks registered in Conductor.",
inputSchema: {
type: "object",
properties: {
access: {
type: "string",
description: "Filter by access type (READ or CREATE)",
enum: ["READ", "CREATE"],
},
},
},
},
{
name: "create_task_definition",
description: "Create or update a task definition. If the task already exists, it will be updated.",
inputSchema: {
type: "object",
properties: {
definition: {
type: "object",
description: "Complete task definition as a JSON object",
},
},
required: ["definition"],
},
},
{
name: "get_event_handlers",
description: "Get all event handlers or filter by event and active status. Event handlers define how Conductor responds to external events.",
inputSchema: {
type: "object",
properties: {
event: {
type: "string",
description: "Filter by event name",
},
activeOnly: {
type: "boolean",
description: "Return only active event handlers (default: true)",
},
},
},
},
];
// Create MCP server
const server = new Server({
name: "conductor-mcp",
version: "1.0.0",
}, {
capabilities: {
tools: {},
},
});
// Handle list tools request
server.setRequestHandler(ListToolsRequestSchema, async () => {
return {
tools,
};
});
// Handle tool execution
server.setRequestHandler(CallToolRequestSchema, async (request) => {
const { name, arguments: args = {} } = request.params;
try {
switch (name) {
case "list_workflows": {
const params = {
start: 0,
size: 100,
};
if (args.workflowName)
params.workflowType = args.workflowName;
if (args.status)
params.status = args.status;
if (args.startTime)
params.startTime = args.startTime;
if (args.endTime)
params.endTime = args.endTime;
if (args.freeText)
params.freeText = args.freeText;
const response = await conductorClient.get("/workflow/search", { params });
return {
content: [
{
type: "text",
text: JSON.stringify(response.data, null, 2),
},
],
};
}
case "get_workflow_status": {
const { workflowId, includeTaskDetails = true } = args;
const url = `/workflow/${workflowId}`;
const params = includeTaskDetails ? { includeTasks: true } : {};
const response = await conductorClient.get(url, { params });
return {
content: [
{
type: "text",
text: JSON.stringify(response.data, null, 2),
},
],
};
}
case "start_workflow": {
const { workflowName, version, input = {}, correlationId, priority = 0 } = args;
const requestBody = {
name: workflowName,
input,
priority,
};
if (version)
requestBody.version = version;
if (correlationId)
requestBody.correlationId = correlationId;
const response = await conductorClient.post("/workflow", requestBody);
return {
content: [
{
type: "text",
text: `Workflow started successfully. Workflow ID: ${response.data}`,
},
],
};
}
case "pause_workflow": {
const { workflowId } = args;
await conductorClient.put(`/workflow/${workflowId}/pause`);
return {
content: [
{
type: "text",
text: `Workflow ${workflowId} paused successfully.`,
},
],
};
}
case "resume_workflow": {
const { workflowId } = args;
await conductorClient.put(`/workflow/${workflowId}/resume`);
return {
content: [
{
type: "text",
text: `Workflow ${workflowId} resumed successfully.`,
},
],
};
}
case "terminate_workflow": {
const { workflowId, reason = "Terminated via MCP" } = args;
await conductorClient.delete(`/workflow/${workflowId}`, {
params: { reason },
});
return {
content: [
{
type: "text",
text: `Workflow ${workflowId} terminated successfully. Reason: ${reason}`,
},
],
};
}
case "restart_workflow": {
const { workflowId, useLatestDefinition = false } = args;
const response = await conductorClient.post(`/workflow/${workflowId}/restart`, null, {
params: { useLatestDefinitions: useLatestDefinition },
});
return {
content: [
{
type: "text",
text: `Workflow restarted successfully. New Workflow ID: ${response.data}`,
},
],
};
}
case "retry_workflow": {
const { workflowId, resumeSubworkflowTasks = false } = args;
await conductorClient.post(`/workflow/${workflowId}/retry`, null, {
params: { resumeSubworkflowTasks },
});
return {
content: [
{
type: "text",
text: `Workflow ${workflowId} retry initiated successfully.`,
},
],
};
}
case "search_workflows": {
const { query, start = 0, size = 100, sort } = args;
const params = {
start,
size,
query,
};
if (sort)
params.sort = sort;
const response = await conductorClient.get("/workflow/search-v2", { params });
return {
content: [
{
type: "text",
text: JSON.stringify(response.data, null, 2),
},
],
};
}
case "get_workflow_definition": {
const { workflowName, version } = args;
const url = version
? `/metadata/workflow/${workflowName}/${version}`
: `/metadata/workflow/${workflowName}`;
const response = await conductorClient.get(url);
return {
content: [
{
type: "text",
text: JSON.stringify(response.data, null, 2),
},
],
};
}
case "list_workflow_definitions": {
const params = {};
if (args.access)
params.access = args.access;
if (args.tagKey)
params.tagKey = args.tagKey;
if (args.tagValue)
params.tagValue = args.tagValue;
const response = await conductorClient.get("/metadata/workflow", { params });
return {
content: [
{
type: "text",
text: JSON.stringify(response.data, null, 2),
},
],
};
}
case "create_workflow_definition": {
const { definition, overwrite = true } = args;
const response = await conductorClient.post(`/metadata/workflow`, definition, {
params: { overwrite },
});
return {
content: [
{
type: "text",
text: `Workflow definition created/updated successfully.`,
},
],
};
}
case "get_task_details": {
const { taskId } = args;
const response = await conductorClient.get(`/tasks/${taskId}`);
return {
content: [
{
type: "text",
text: JSON.stringify(response.data, null, 2),
},
],
};
}
case "get_task_logs": {
const { taskId } = args;
const response = await conductorClient.get(`/tasks/${taskId}/log`);
return {
content: [
{
type: "text",
text: JSON.stringify(response.data, null, 2),
},
],
};
}
case "update_task_status": {
const { taskId, workflowInstanceId, status, output = {}, logs = [] } = args;
const taskUpdate = {
workflowInstanceId,
taskId,
status,
outputData: output,
logs,
};
await conductorClient.post("/tasks", taskUpdate);
return {
content: [
{
type: "text",
text: `Task ${taskId} status updated to ${status} successfully.`,
},
],
};
}
case "get_task_definition": {
const { taskName } = args;
const response = await conductorClient.get(`/metadata/taskdefs/${taskName}`);
return {
content: [
{
type: "text",
text: JSON.stringify(response.data, null, 2),
},
],
};
}
case "list_task_definitions": {
const params = {};
if (args.access)
params.access = args.access;
const response = await conductorClient.get("/metadata/taskdefs", { params });
return {
content: [
{
type: "text",
text: JSON.stringify(response.data, null, 2),
},
],
};
}
case "create_task_definition": {
const { definition } = args;
await conductorClient.post("/metadata/taskdefs", [definition]);
return {
content: [
{
type: "text",
text: `Task definition created/updated successfully.`,
},
],
};
}
case "get_event_handlers": {
const params = {};
if (args.event)
params.event = args.event;
if (args.activeOnly !== undefined)
params.activeOnly = args.activeOnly;
else
params.activeOnly = true;
const response = await conductorClient.get("/event", { params });
return {
content: [
{
type: "text",
text: JSON.stringify(response.data, null, 2),
},
],
};
}
default:
return {
content: [
{
type: "text",
text: `Unknown tool: ${name}`,
},
],
isError: true,
};
}
}
catch (error) {
const errorMessage = error.response?.data?.message || error.message || "Unknown error";
const statusCode = error.response?.status || 500;
return {
content: [
{
type: "text",
text: `Error executing ${name}: [${statusCode}] ${errorMessage}`,
},
],
isError: true,
};
}
});
// Start the server
async function main() {
const transport = new StdioServerTransport();
await server.connect(transport);
console.error("Conductor MCP Server running on stdio");
console.error(`Connected to Conductor server at: ${config.baseUrl}${config.apiPath}`);
}
main().catch((error) => {
console.error("Fatal error in main():", error);
process.exit(1);
});
//# sourceMappingURL=index.js.map