MCP Orchestrator Server
by mokafari
#!/usr/bin/env node
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import {
CallToolRequestSchema,
ListResourcesRequestSchema,
ListToolsRequestSchema,
ReadResourceRequestSchema,
ErrorCode,
McpError,
} from "@modelcontextprotocol/sdk/types.js";
import { readFileSync, writeFileSync, existsSync } from 'fs';
import { join, dirname } from 'path';
import { fileURLToPath } from 'url';
const __filename = fileURLToPath(import.meta.url);
const __dirname = dirname(__filename);
const TASKS_FILE = join(__dirname, '..', 'data', 'tasks.json');
interface Task {
id: string;
description: string;
status: 'pending' | 'in_progress' | 'completed';
assignedTo?: string;
result?: string;
dependencies?: string[];
}
// Initialize tasks from file or create empty object
let tasks: { [id: string]: Task } = {};
function loadTasks() {
try {
if (existsSync(TASKS_FILE)) {
const data = readFileSync(TASKS_FILE, 'utf8');
tasks = JSON.parse(data);
debug('Loaded tasks from file:', tasks);
} else {
debug('No tasks file found, starting with empty tasks');
saveTasks(); // Create the file
}
} catch (error) {
console.error('Error loading tasks:', error);
process.exit(1);
}
}
function saveTasks() {
try {
// Create data directory if it doesn't exist
const dataDir = dirname(TASKS_FILE);
if (!existsSync(dataDir)) {
const { mkdirSync } = require('fs');
mkdirSync(dataDir, { recursive: true });
}
writeFileSync(TASKS_FILE, JSON.stringify(tasks, null, 2));
debug('Saved tasks to file');
} catch (error) {
console.error('Error saving tasks:', error);
}
}
function debug(message: string, ...args: any[]) {
const timestamp = new Date().toISOString();
console.error(`[${timestamp}] DEBUG: ${message}`, ...args);
}
// Create MCP server
const server = new Server(
{
name: "orchestrator-server",
version: "1.0.0",
},
{
capabilities: {
resources: {},
tools: {},
},
}
);
// Tool implementations
server.setRequestHandler(CallToolRequestSchema, async (request) => {
debug(`Handling tool request: ${request.params.name}`);
debug(`Request arguments:`, request.params.arguments);
try {
switch (request.params.name) {
case "create_task": {
const { id, description, dependencies } = request.params.arguments as {
id: string;
description: string;
dependencies?: string[];
};
debug(`Creating task ${id}: ${description}`);
if (tasks[id]) {
throw new McpError(ErrorCode.InvalidRequest, `Task ${id} already exists`);
}
// Verify dependencies exist
if (dependencies) {
for (const depId of dependencies) {
if (!tasks[depId]) {
throw new McpError(ErrorCode.InvalidRequest, `Dependency task ${depId} not found`);
}
}
}
const task: Task = {
id,
description,
status: 'pending',
dependencies
};
tasks[id] = task;
saveTasks();
debug(`Created task ${id}`);
return {
content: [{
type: "text",
text: JSON.stringify(task, null, 2)
}]
};
}
case "update_task": {
const { task_id, description, dependencies } = request.params.arguments as {
task_id: string;
description?: string;
dependencies?: string[];
};
debug(`Updating task ${task_id}`);
const task = tasks[task_id];
if (!task) {
throw new McpError(ErrorCode.InvalidRequest, `Task ${task_id} not found`);
}
if (task.status !== 'pending') {
throw new McpError(ErrorCode.InvalidRequest, `Cannot update task ${task_id} in ${task.status} state`);
}
// Verify dependencies exist and don't create cycles
if (dependencies) {
const visited = new Set<string>();
const checkCycle = (taskId: string): boolean => {
if (visited.has(taskId)) return true;
visited.add(taskId);
return (tasks[taskId]?.dependencies || []).some(depId => checkCycle(depId));
};
for (const depId of dependencies) {
if (!tasks[depId]) {
throw new McpError(ErrorCode.InvalidRequest, `Dependency task ${depId} not found`);
}
if (depId === task_id || checkCycle(depId)) {
throw new McpError(ErrorCode.InvalidRequest, `Dependencies would create a cycle`);
}
}
task.dependencies = dependencies;
}
if (description) {
task.description = description;
}
saveTasks();
debug(`Updated task ${task_id}`);
return {
content: [{
type: "text",
text: JSON.stringify(task, null, 2)
}]
};
}
case "delete_task": {
const { task_id } = request.params.arguments as {
task_id: string;
};
debug(`Deleting task ${task_id}`);
const task = tasks[task_id];
if (!task) {
throw new McpError(ErrorCode.InvalidRequest, `Task ${task_id} not found`);
}
// Check if any other tasks depend on this one
const dependentTasks = Object.values(tasks).filter(t =>
t.dependencies?.includes(task_id)
);
if (dependentTasks.length > 0) {
throw new McpError(
ErrorCode.InvalidRequest,
`Cannot delete task ${task_id} as it has dependent tasks: ${dependentTasks.map(t => t.id).join(', ')}`
);
}
delete tasks[task_id];
saveTasks();
debug(`Deleted task ${task_id}`);
return {
content: [{
type: "text",
text: JSON.stringify({ deleted: task_id }, null, 2)
}]
};
}
case "get_next_task": {
const { instance_id } = request.params.arguments as { instance_id: string };
debug(`Instance ${instance_id} requesting next task`);
// Find a pending task with no incomplete dependencies
const availableTask = Object.values(tasks).find(task => {
if (task.status !== 'pending') return false;
if (!task.dependencies) return true;
return task.dependencies.every(depId => tasks[depId]?.status === 'completed');
});
if (!availableTask) {
debug('No tasks available');
return {
content: [{
type: "text",
text: JSON.stringify({ status: 'no_tasks' }, null, 2)
}]
};
}
availableTask.status = 'in_progress';
availableTask.assignedTo = instance_id;
saveTasks();
debug(`Assigned task ${availableTask.id} to instance ${instance_id}`);
return {
content: [{
type: "text",
text: JSON.stringify(availableTask, null, 2)
}]
};
}
case "complete_task": {
const { task_id, instance_id, result } = request.params.arguments as {
task_id: string;
instance_id: string;
result: string;
};
debug(`Instance ${instance_id} completing task ${task_id}`);
const task = tasks[task_id];
if (!task) {
throw new McpError(ErrorCode.InvalidRequest, `Task ${task_id} not found`);
}
if (task.assignedTo !== instance_id) {
throw new McpError(ErrorCode.InvalidRequest, `Task ${task_id} is not assigned to instance ${instance_id}`);
}
task.status = 'completed';
task.result = result;
saveTasks();
debug(`Task ${task_id} completed by instance ${instance_id}`);
// Find tasks that can now be started
const unlockedTasks = Object.values(tasks).filter(t =>
t.status === 'pending' &&
t.dependencies?.includes(task_id) &&
t.dependencies.every(depId => tasks[depId]?.status === 'completed')
);
return {
content: [{
type: "text",
text: JSON.stringify({
completed_task: task,
unlocked_tasks: unlockedTasks
}, null, 2)
}]
};
}
case "get_task_status": {
debug('Getting task status');
return {
content: [{
type: "text",
text: JSON.stringify(Object.values(tasks), null, 2)
}]
};
}
case "get_task_details": {
const { task_id } = request.params.arguments as { task_id: string };
debug(`Getting details for task ${task_id}`);
const task = tasks[task_id];
if (!task) {
throw new McpError(ErrorCode.InvalidRequest, `Task ${task_id} not found`);
}
return {
content: [{
type: "text",
text: JSON.stringify(task, null, 2)
}]
};
}
default:
throw new McpError(ErrorCode.MethodNotFound, "Unknown tool");
}
} catch (error) {
debug('Error handling request:', error);
throw error;
}
});
// List available tools
server.setRequestHandler(ListToolsRequestSchema, async () => {
debug(`Listing tools`);
return {
tools: [
{
name: "create_task",
description: "Create a new task",
inputSchema: {
type: "object",
properties: {
id: {
type: "string",
description: "Unique identifier for the task"
},
description: {
type: "string",
description: "Description of the task"
},
dependencies: {
type: "array",
items: {
type: "string"
},
description: "IDs of tasks that must be completed first"
}
},
required: ["id", "description"]
}
},
{
name: "update_task",
description: "Update an existing pending task",
inputSchema: {
type: "object",
properties: {
task_id: {
type: "string",
description: "ID of the task to update"
},
description: {
type: "string",
description: "New description for the task"
},
dependencies: {
type: "array",
items: {
type: "string"
},
description: "New list of dependency task IDs"
}
},
required: ["task_id"]
}
},
{
name: "delete_task",
description: "Delete a task if it has no dependents",
inputSchema: {
type: "object",
properties: {
task_id: {
type: "string",
description: "ID of the task to delete"
}
},
required: ["task_id"]
}
},
{
name: "get_next_task",
description: "Get the next available task",
inputSchema: {
type: "object",
properties: {
instance_id: {
type: "string",
description: "ID of the instance requesting work"
}
},
required: ["instance_id"]
}
},
{
name: "complete_task",
description: "Mark a task as completed",
inputSchema: {
type: "object",
properties: {
task_id: {
type: "string",
description: "ID of the task to complete"
},
instance_id: {
type: "string",
description: "ID of the instance completing the task"
},
result: {
type: "string",
description: "Result or output from the task"
}
},
required: ["task_id", "instance_id", "result"]
}
},
{
name: "get_task_status",
description: "Get status of all tasks",
inputSchema: {
type: "object",
properties: {},
required: []
}
},
{
name: "get_task_details",
description: "Get details of a specific task",
inputSchema: {
type: "object",
properties: {
task_id: {
type: "string",
description: "ID of the task to get details for"
}
},
required: ["task_id"]
}
}
]
};
});
// Start server
async function main() {
try {
loadTasks(); // Load tasks from file
const transport = new StdioServerTransport();
await server.connect(transport);
debug('MCP server running on stdio');
} catch (error) {
console.error('Failed to start server:', error);
process.exit(1);
}
}
main();