/**
* Stage-Specific Worker Process
*
* Each worker:
* 1. Runs on its own port with its own MCP server
* 2. Polls Server A for tasks at its assigned stage
* 3. Executes tasks using Gemini CLI with stage-specific prompts
* 4. Reports results back to Server A
*
* Environment Variables:
* - WORKER_STAGE: research | planning | implementation | testing
* - WORKER_PORT: Port to run on (3001-3004)
* - SERVER_URL: URL of Server A (default: http://localhost:3000)
*/
import 'dotenv/config';
import { exec } from 'child_process';
import { promisify } from 'util';
import http from 'http';
const execAsync = promisify(exec);
// Configuration from environment
const STAGE = process.env.WORKER_STAGE || 'implementation';
const PORT = parseInt(process.env.WORKER_PORT || '3001');
const SERVER_URL = process.env.SERVER_URL || 'http://localhost:3000';
const POLL_INTERVAL = parseInt(process.env.POLL_INTERVAL || '3000'); // 3 seconds
// Stage-specific prompts for Gemini
const STAGE_PROMPTS = {
research: `You are a Research Agent. Your job is to:
1. Research options and approaches for the given task
2. List pros and cons of each approach
3. Make a recommendation
Output your research in a structured format. Be thorough but concise.`,
planning: `You are a Planning Agent. Your job is to:
1. Review the research/context provided
2. Create a detailed implementation plan
3. Break down into specific steps
4. Identify any risks or dependencies
Output a clear, actionable plan.`,
implementation: `You are an Implementation Agent. Your job is to:
1. Review the plan provided
2. Execute the implementation using available tools
3. Write/modify code as needed
4. Report what was done
Use the MCP tools available to you (file operations, shell commands, etc.)`,
testing: `You are a Testing Agent. Your job is to:
1. Review what was implemented
2. Run tests or verify the implementation works
3. Check for bugs or issues
4. Report pass/fail with details
Be thorough in validation.`
};
let isRunning = true;
let currentTask = null;
/**
* Fetch next task from Server A queue
*/
async function fetchNextTask() {
try {
const response = await fetch(`${SERVER_URL}/api/queue/next?stage=${STAGE}`);
if (!response.ok) {
if (response.status === 404) {
return null; // No tasks available
}
throw new Error(`HTTP ${response.status}: ${response.statusText}`);
}
return await response.json();
} catch (error) {
console.error(`[Worker:${STAGE}] Error fetching task:`, error.message);
return null;
}
}
/**
* Report stage completion to Server A
*/
async function reportResult(taskId, result, success = true) {
try {
const response = await fetch(`${SERVER_URL}/api/queue/result`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
taskId,
stage: STAGE,
result,
success
})
});
if (!response.ok) {
throw new Error(`HTTP ${response.status}: ${response.statusText}`);
}
console.log(`[Worker:${STAGE}] Reported result for task ${taskId}`);
return true;
} catch (error) {
console.error(`[Worker:${STAGE}] Error reporting result:`, error.message);
return false;
}
}
/**
* Execute task using Gemini CLI (prompt mode - no MCP)
* Uses 'gemini -p' for one-shot prompt without spawning MCP server
*/
async function executeTask(pipelineTask) {
const { taskId, taskData, stageResults } = pipelineTask;
const stagePrompt = STAGE_PROMPTS[STAGE];
// Build context from previous stages
let context = '';
if (stageResults && Object.keys(stageResults).length > 0) {
context = '\n\n=== Previous Stage Results ===\n';
for (const [stage, result] of Object.entries(stageResults)) {
context += `\n[${stage.toUpperCase()}]:\n${result}\n`;
}
}
// Build the full prompt
const fullPrompt = `${stagePrompt}
=== Task ===
Title: ${taskData.title || 'Unknown Task'}
Description: ${taskData.description || 'No description'}
${context}
Please complete this ${STAGE} stage now. Provide a clear, structured response.`;
console.log(`[Worker:${STAGE}] Executing task: "${taskData.title || taskId}"`);
try {
// Use 'gemini -p' for prompt mode (no MCP server spawn)
// Escape the prompt for shell
const escapedPrompt = fullPrompt.replace(/"/g, '\\"').replace(/\n/g, '\\n');
const { stdout, stderr } = await execAsync(
`echo "${escapedPrompt}" | gemini`,
{
cwd: process.cwd(),
timeout: 300000, // 5 minute timeout
maxBuffer: 10 * 1024 * 1024, // 10MB buffer
shell: true
}
);
const result = stdout.trim() || stderr.trim() || 'Task completed (no output)';
console.log(`[Worker:${STAGE}] Task completed: ${taskId}`);
return { success: true, result };
} catch (error) {
console.error(`[Worker:${STAGE}] Task execution failed:`, error.message);
return { success: false, result: `Error: ${error.message}` };
}
}
/**
* Main polling loop
*/
async function pollLoop() {
console.log(`[Worker:${STAGE}] Starting poll loop (interval: ${POLL_INTERVAL}ms)...`);
while (isRunning) {
try {
// Check for next task
const pipelineTask = await fetchNextTask();
if (pipelineTask) {
currentTask = pipelineTask;
console.log(`[Worker:${STAGE}] Got task: ${pipelineTask.taskId}`);
// Execute the task
const { success, result } = await executeTask(pipelineTask);
// Report result
await reportResult(pipelineTask.taskId, result, success);
currentTask = null;
}
} catch (error) {
console.error(`[Worker:${STAGE}] Poll loop error:`, error);
}
// Wait before next poll
await new Promise(resolve => setTimeout(resolve, POLL_INTERVAL));
}
}
/**
* Start a simple HTTP server for health checks
*/
function startHealthServer() {
const server = http.createServer((req, res) => {
if (req.url === '/health') {
res.writeHead(200, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({
stage: STAGE,
port: PORT,
status: isRunning ? 'running' : 'stopped',
currentTask: currentTask?.taskId || null
}));
} else {
res.writeHead(404);
res.end('Not Found');
}
});
server.listen(PORT, () => {
console.log(`[Worker:${STAGE}] Health server running on port ${PORT}`);
});
server.on('error', (e) => {
if (e.code === 'EADDRINUSE') {
console.error(`[Worker:${STAGE}] Port ${PORT} in use, running without health server`);
} else {
console.error(`[Worker:${STAGE}] Server error:`, e);
}
});
return server;
}
/**
* Graceful shutdown
*/
function setupShutdown() {
const shutdown = () => {
console.log(`[Worker:${STAGE}] Shutting down...`);
isRunning = false;
process.exit(0);
};
process.on('SIGTERM', shutdown);
process.on('SIGINT', shutdown);
}
// Main entry point
console.log(`[Worker:${STAGE}] Starting worker...`);
console.log(`[Worker:${STAGE}] Stage: ${STAGE}, Port: ${PORT}, Server: ${SERVER_URL}`);
setupShutdown();
startHealthServer();
pollLoop();