Skip to main content
Glama

tasksync-mcp

index.ts22.3 kB
#!/usr/bin/env node import { Server } from "@modelcontextprotocol/sdk/server/index.js"; import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js"; import { SSEServerTransport } from "@modelcontextprotocol/sdk/server/sse.js"; import { CallToolRequestSchema, ListToolsRequestSchema, ToolSchema, RootsListChangedNotificationSchema, type Root, } from "@modelcontextprotocol/sdk/types.js"; import fs from "fs/promises"; import { createReadStream, watch, FSWatcher } from "fs"; import path from "path"; import { z } from "zod"; import { zodToJsonSchema } from "zod-to-json-schema"; import express from "express"; import { normalizePath, expandHome } from './path-utils.js'; import { getValidRootDirectories } from './roots-utils.js'; import { validatePath, readFileContent, tailFile, headFile, setAllowedDirectories, } from './lib.js'; // Parse configuration from environment variables or command line // Command line argument parsing with embedded mode support const args = process.argv.slice(2); const useSSE = args.includes('--sse'); const ssePort = parseInt(args.find(arg => arg.startsWith('--port='))?.split('=')[1] || '3001'); const directoryArgs = args.filter(arg => !['--sse', '--embedded', '--stdio'].some(flag => arg.startsWith(flag)) && !arg.startsWith('--port=') ); // Store allowed directories in normalized and resolved form let allowedDirectories: string[] = []; // Auto-detect current directory if no directories provided allowedDirectories = directoryArgs.length === 0 ? [normalizePath(process.cwd())] : await Promise.all(directoryArgs.map(async (dir) => { const absolute = path.resolve(expandHome(dir)); try { return normalizePath(await fs.realpath(absolute)); } catch { return normalizePath(absolute); } })); directoryArgs.length === 0 && console.error(`Auto-detected allowed directory: ${allowedDirectories[0]}`); // Validate that all directories exist and are accessible await Promise.all(allowedDirectories.map(async (dir) => { try { const stats = await fs.stat(dir); if (!stats.isDirectory()) throw new Error(`${dir} is not a directory`); } catch (error) { console.error(`Error accessing directory ${dir}:`, error); process.exit(1); } })); // Initialize the global allowedDirectories in lib.ts setAllowedDirectories(allowedDirectories); // File watching state for check_review (support multiple files) const lastFileModifiedByPath: Map<string, number> = new Map(); const fileWatchers: Map<string, FSWatcher> = new Map(); const connectedTransports: Set<SSEServerTransport> = new Set(); // Waiting mechanism for check_review const waitingForFileChange: Array<{ path: string; resolve: (content: string) => void; reject: (error: Error) => void; }> = []; // Lazy initialization state let isInitialized = false; // Schema definitions const AskReviewArgsSchema = z.object({ path: z.string().optional().describe('Absolute or relative path to the feedback file within allowed directories. Defaults to feedback.md in the current working directory.'), tail: z.number().optional().describe('If provided, returns only the last N lines of the review file'), head: z.number().optional().describe('If provided, returns only the first N lines of the review file') }); const ReadImageFileArgsSchema = z.object({ path: z.string() }); const ToolInputSchema = ToolSchema.shape.inputSchema; type ToolInput = z.infer<typeof ToolInputSchema>; function formatResponseWithHeadTail(content: string, head?: number, tail?: number): { content: { type: "text", text: string }[] } { if (head && tail) { throw new Error("Cannot specify both head and tail parameters simultaneously"); } if (tail) { const lines = content.split('\n'); const tailLines = lines.slice(-tail); return { content: [{ type: "text", text: tailLines.join('\n') }], }; } if (head) { const lines = content.split('\n'); const headLines = lines.slice(0, head); return { content: [{ type: "text", text: headLines.join('\n') }], }; } return { content: [{ type: "text", text: content }], }; } // Server setup const server = new Server( { name: "tasksync-server", version: "1.0.0", }, { capabilities: { tools: {}, logging: {}, // Enable logging for notifications }, }, ); // Lazy initialization for file watcher async function ensureInitialized() { if (isInitialized) return; console.error("Initializing TaskSync server components..."); await setupFileWatcher(path.join(process.cwd(), 'feedback.md'), true); isInitialized = true; console.error("TaskSync server initialized successfully"); } // File watching functions async function setupFileWatcher(filePath: string, createIfMissing: boolean = false) { try { // Check if file exists, create if it doesn't try { await fs.access(filePath); console.error(`File exists: ${filePath}`); } catch { if (!createIfMissing) { console.error(`File does not exist, skipping watcher: ${filePath}`); return; } console.error(`Creating file: ${filePath}`); await fs.mkdir(path.dirname(filePath), { recursive: true }); await fs.writeFile(filePath, 'No review content yet.'); } // Get initial modification time and setup watcher const stats = await fs.stat(filePath); const mtime = stats.mtime.getTime(); lastFileModifiedByPath.set(filePath, mtime); console.error(`Initial file modification time for ${filePath}: ${mtime}`); if (!fileWatchers.has(filePath)) { const watcher = watch(filePath, async (eventType) => { console.error(`File watcher event: ${eventType} for ${filePath}`); eventType === 'change' && await notifyClientsOfFileChange(filePath); }); fileWatchers.set(filePath, watcher); console.error(`File watcher setup successfully for: ${filePath}`); } } catch (error) { console.error(`Failed to setup file watcher: ${error}`); throw error; } } async function notifyClientsOfFileChange(filePath: string) { try { const stats = await fs.stat(filePath); const currentModified = stats.mtime.getTime(); if (lastFileModifiedByPath.get(filePath) === currentModified) return; console.error(`File change detected for ${filePath}: ${lastFileModifiedByPath.get(filePath)} -> ${currentModified}`); lastFileModifiedByPath.set(filePath, currentModified); const content = await readFileContent(filePath); // Resolve waiting calls and filter remaining const { resolved, remaining } = waitingForFileChange.reduce( (acc, item) => { item.path === filePath ? (item.resolve(content), acc.resolved++) : acc.remaining.push(item); return acc; }, { resolved: 0, remaining: [] as typeof waitingForFileChange } ); waitingForFileChange.splice(0, waitingForFileChange.length, ...remaining); console.error(`Resolving ${resolved} waiting calls for ${filePath}`); // Send notifications to all connected clients const notifications = connectedTransports.size; await Promise.allSettled([...connectedTransports].map(transport => transport.send({ jsonrpc: "2.0", method: "notifications/message", params: { level: "info", logger: "tasksync-server", data: { type: "file_changed", path: path.relative(process.cwd(), filePath) || filePath, content, timestamp: new Date().toISOString() } } }).catch(error => console.error(`Failed to send notification to client: ${error}`)) )); console.error(`File change notification sent to ${notifications} clients and ${resolved} waiting calls resolved for ${filePath}`); } catch (error) { console.error(`Error in notifyClientsOfFileChange: ${error}`); } } // Reads a file as a stream of buffers, concatenates them, and then encodes // the result to a Base64 string. This is a memory-efficient way to handle // binary data from a stream before the final encoding. async function readFileAsBase64Stream(filePath: string): Promise<string> { return new Promise((resolve, reject) => { const stream = createReadStream(filePath); const chunks: Buffer[] = []; stream.on('data', (chunk) => { chunks.push(chunk as Buffer); }); stream.on('end', () => { const finalBuffer = Buffer.concat(chunks); resolve(finalBuffer.toString('base64')); }); stream.on('error', (err) => reject(err)); }); } // Tool handlers server.setRequestHandler(ListToolsRequestSchema, async () => { return { tools: [ { name: "get_feedback", description: "Read the contents of a feedback.md file (defaults to ./feedback.md). Before calling this tool you must seearch for the feedback.md file." + "If the feedback file does not exist, you will automatically create an empty feedback.md file before attempting to use this tool. " + "During any process, task, or conversation, whether asking, replying, or completing phased tasks, you must call this tool to check for review. " + "Only when the user explicitly indicates <end or no more interaction needed> can you stop calling this tool, and the process is considered complete.\n\n" + "MANDATORY USAGE RULES -PRIMARY DIRECTIVE:\n" + "1. CONTINUOUS FEEDBACK LOOP: During any process, task, or conversation, whether asking, replying, or completing phased tasks, you MUST call this tool to check for feedback.\n" + "2. NO STOPPING WITHOUT PERMISSION: Unless receiving explicit termination instructions (\"end\", \"stop\", \"no more interaction needed\"), ALL steps must repeatedly call this tool.\n" + "3. RESPOND TO FEEDBACK: Whenever user feedback is received, if the feedback content is not empty, you MUST call this tool again and adjust behavior based on the feedback content.\n" + "4. TERMINATION CONDITIONS: Only when the user explicitly indicates \"end\", \"stop\", \"finished\", or \"no more interaction needed\" can you stop calling this tool.\n" + "5. PROVIDE CONTEXT: You should summarize what you have done and provide project context to help the user provide meaningful feedback for the next step.\n" + "6. KEEP ITERATING: If the user provides feedback, suggestions, or asks questions, continue the feedback loop by calling this tool again after addressing their input.\n" + "7. NEVER ASSUME COMPLETION: Do not assume a task is complete without explicit user confirmation through this feedback mechanism.\n\n", inputSchema: zodToJsonSchema(AskReviewArgsSchema) as ToolInput, }, { name: "view_media", description: "Read an image file. Returns the base64 encoded data and MIME type. " + "Only works within allowed directories.\n\n" + "SUPPORTED FORMATS:\n" + "Images: PNG, JPEG, GIF, WebP, BMP, SVG\n\n" + "USAGE:\n" + "Use this tool to read and encode image files for analysis, display, or processing. " + "The tool streams files efficiently and returns base64-encoded data with proper MIME type detection.\n\n" + "Args:\n" + " path: Absolute or relative path to the image file within allowed directories", inputSchema: zodToJsonSchema(ReadImageFileArgsSchema) as ToolInput, }, ], }; }); server.setRequestHandler(CallToolRequestSchema, async (request) => { try { // Ensure server is initialized on first tool call await ensureInitialized(); const { name, arguments: args } = request.params; switch (name) { case "get_feedback": { const parsed = AskReviewArgsSchema.safeParse(args); if (!parsed.success) { throw new Error(`Invalid arguments for get_feedback: ${parsed.error}`); } // Determine path: use provided path if any, else default to ./feedback.md const targetPath = parsed.data.path || path.join(process.cwd(), 'feedback.md'); // Create feedback.md file if it doesn't exist (only for default feedback.md, not custom paths) if (!parsed.data.path) { try { await fs.access(targetPath); } catch (error) { if ((error as NodeJS.ErrnoException).code === 'ENOENT') { try { await fs.writeFile(targetPath, '', 'utf-8'); console.error(`Created missing feedback file: ${targetPath}`); } catch (writeError) { console.error(`Failed to create feedback file: ${writeError}`); throw new Error(`Could not create feedback file: ${targetPath}`); } } } } const validPath = await validatePath(targetPath); const stats = await fs.stat(validPath); const currentModified = stats.mtime.getTime(); // Ensure a watcher exists for this path (do not create file if missing) await setupFileWatcher(validPath, false); const lastKnown = lastFileModifiedByPath.get(validPath) ?? null; console.error(`check_review: Current file modified: ${currentModified}, Last known: ${lastKnown}`); // If this is the first call or file has changed, return content immediately if (lastKnown === null || lastKnown < currentModified) { console.error("check_review: File has changed, returning content immediately"); lastFileModifiedByPath.set(validPath, currentModified); if (parsed.data.head && parsed.data.tail) { throw new Error("Cannot specify both head and tail parameters simultaneously"); } const content = parsed.data.tail ? await tailFile(validPath, parsed.data.tail) : parsed.data.head ? await headFile(validPath, parsed.data.head) : await readFileContent(validPath); return { content: [{ type: "text", text: content }] }; } // File hasn't changed - wait for file change using file watcher console.error("check_review: File hasn't changed, waiting for modification..."); console.error(`check_review: Current waiting queue size: ${waitingForFileChange.length}`); const content = await new Promise<string>((resolve, reject) => { const timeout = setTimeout(() => { const index = waitingForFileChange.findIndex(w => w.resolve === resolve); if (index !== -1) { waitingForFileChange.splice(index, 1); } console.error("check_review: Timeout reached after 5 minutes"); reject(new Error("Timeout waiting for file change (5 minutes)")); }, 300000); // 5 minute timeout console.error("check_review: Adding to waiting queue"); waitingForFileChange.push({ path: validPath, resolve: (content: string) => { console.error("check_review: Promise resolved with content"); clearTimeout(timeout); resolve(content); }, reject: (error: Error) => { console.error(`check_review: Promise rejected with error: ${error.message}`); clearTimeout(timeout); reject(error); } }); console.error(`check_review: Updated waiting queue size: ${waitingForFileChange.length}`); }); return formatResponseWithHeadTail(content, parsed.data.head, parsed.data.tail); } case "view_media": { const parsed = ReadImageFileArgsSchema.safeParse(args); if (!parsed.success) { throw new Error(`Invalid arguments for view_media: ${parsed.error}`); } const validPath = await validatePath(parsed.data.path); const extension = path.extname(validPath).toLowerCase(); const mimeTypes: Record<string, string> = { ".png": "image/png", ".jpg": "image/jpeg", ".jpeg": "image/jpeg", ".gif": "image/gif", ".webp": "image/webp", ".bmp": "image/bmp", ".svg": "image/svg+xml", }; const mimeType = mimeTypes[extension] || "application/octet-stream"; const data = await readFileAsBase64Stream(validPath); const type = mimeType.startsWith("image/") ? "image" : "blob"; return { content: [{ type, data, mimeType }], }; } default: throw new Error(`Unknown tool: ${name}`); } } catch (error) { const errorMessage = error instanceof Error ? error.message : String(error); return { content: [{ type: "text", text: `Error: ${errorMessage}` }], isError: true, }; } }); // Updates allowed directories based on MCP client roots async function updateAllowedDirectoriesFromRoots(requestedRoots: Root[]) { const validatedRootDirs = await getValidRootDirectories(requestedRoots); if (validatedRootDirs.length > 0) { allowedDirectories = [...validatedRootDirs]; setAllowedDirectories(allowedDirectories); // Update the global state in lib.ts console.error(`Updated allowed directories from MCP roots: ${validatedRootDirs.length} valid directories`); } else { console.error("No valid root directories provided by client"); } } // Handles dynamic roots updates during runtime, when client sends "roots/list_changed" notification, server fetches the updated roots and replaces all allowed directories with the new roots. server.setNotificationHandler(RootsListChangedNotificationSchema, async () => { try { // Request the updated roots list from the client const response = await server.listRoots(); if (response && 'roots' in response) { await updateAllowedDirectoriesFromRoots(response.roots); } } catch (error) { console.error("Failed to request roots from client:", error instanceof Error ? error.message : String(error)); } }); // Handles post-initialization setup, specifically checking for and fetching MCP roots. server.oninitialized = async () => { const clientCapabilities = server.getClientCapabilities(); if (clientCapabilities?.roots) { try { const response = await server.listRoots(); if (response && 'roots' in response) { await updateAllowedDirectoriesFromRoots(response.roots); } else { console.error("Client returned no roots set, keeping current settings"); } } catch (error) { console.error("Failed to request initial roots from client:", error instanceof Error ? error.message : String(error)); } } else { if (allowedDirectories.length > 0) { console.error("Client does not support MCP Roots, using allowed directories set from server args:", allowedDirectories); } else { throw new Error(`Server cannot operate: No allowed directories available. Server was started without command-line directories and client either does not support MCP roots protocol or provided empty roots. Please either: 1) Start server with directory arguments, or 2) Use a client that supports MCP roots protocol and provides valid root directories.`); } } }; // Start server async function runServer() { if (useSSE) { await runSSEServer(); } else { await runStdioServer(); } } async function runStdioServer() { const transport = new StdioServerTransport(); await server.connect(transport); console.error("TaskSync MCP Server running on stdio"); console.error(`Allowed directories: ${allowedDirectories.join(', ')}`); console.error("Server will initialize components when first tool is called"); } async function runSSEServer() { const app = express(); // Enable CORS for development app.use((req, res, next) => { res.header('Access-Control-Allow-Origin', '*'); res.header('Access-Control-Allow-Methods', 'GET, POST, OPTIONS'); res.header('Access-Control-Allow-Headers', 'Content-Type, Authorization'); if (req.method === 'OPTIONS') { res.sendStatus(200); } else { next(); } }); app.use(express.json()); // SSE endpoint app.get("/sse", async (_, res) => { const transport = new SSEServerTransport('/messages', res); connectedTransports.add(transport); res.on("close", () => { connectedTransports.delete(transport); console.error(`Client disconnected. Active connections: ${connectedTransports.size}`); }); console.error(`Client connected via SSE. Active connections: ${connectedTransports.size}`); await server.connect(transport); }); // Messages endpoint for POST requests app.post("/messages", async (req, res) => { const sessionId = req.query.sessionId as string; const transport = Array.from(connectedTransports).find(t => t.sessionId === sessionId); if (transport) { await transport.handlePostMessage(req, res); } else { res.status(400).send("No transport found for sessionId"); } }); // Health check endpoint app.get("/health", (_, res) => { res.json({ status: "ok", server: "tasksync-mcp", version: "1.0.0", connections: connectedTransports.size, allowedDirectories: allowedDirectories.length }); }); // Setup file watcher for default review file await setupFileWatcher(path.join(process.cwd(), 'feedback.md'), true); app.listen(ssePort, () => { console.error(`TaskSync MCP Server running on SSE at http://localhost:${ssePort}`); console.error(`SSE endpoint: http://localhost:${ssePort}/sse`); console.error(`Health check: http://localhost:${ssePort}/health`); console.error(`Allowed directories: ${allowedDirectories.join(', ')}`); console.error(`File watcher active for: feedback.md`); }); } // Cleanup on exit process.on('SIGINT', () => { console.error('\nShutting down server...'); for (const [watchedPath, watcher] of fileWatchers.entries()) { try { watcher.close(); console.error(`Closed watcher for ${watchedPath}`); } catch {} } process.exit(0); }); runServer().catch((error) => { console.error("Fatal error running TaskSync server:", error); process.exit(1); });

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/4regab/tasksync-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server