batch-processor-tool.ts•6.91 kB
import { z } from 'zod';
import { BaseTool, ToolDefinition, ToolResponse } from './base-tool.js';
import { OpenAIClient, OpenAIRequest } from '../openai-client.js';
import { PromptOptimizer } from '../prompt-optimizer.js';
const BatchProcessorSchema = z.object({
prompts: z.array(z.string()).describe('Array of prompts to process'),
model: z.string().optional().describe('Model to use for all prompts'),
taskType: z.enum(['analysis', 'generation', 'reasoning', 'coding']).optional().describe('Task type for optimization'),
parallel: z.boolean().optional().describe('Process prompts in parallel'),
maxConcurrency: z.number().min(1).max(10).optional().describe('Maximum concurrent requests'),
temperature: z.number().min(0).max(2).optional().describe('Temperature for all requests'),
maxTokens: z.number().positive().optional().describe('Max tokens per response')
});
interface BatchResult {
index: number;
prompt: string;
response: string;
success: boolean;
error?: string;
tokens?: number;
processingTime: number;
}
export class BatchProcessorTool extends BaseTool {
readonly definition: ToolDefinition = {
name: 'process_batch_prompts',
description: 'Process multiple prompts in batch with parallel execution support',
inputSchema: BatchProcessorSchema
};
constructor(
private openaiClient: OpenAIClient,
private promptOptimizer: PromptOptimizer
) {
super();
}
async execute(args: z.infer<typeof BatchProcessorSchema>): Promise<ToolResponse> {
try {
const startTime = Date.now();
const { prompts, parallel = true, maxConcurrency = 5 } = args;
let results: BatchResult[];
if (parallel && prompts.length > 1) {
results = await this.processParallel(prompts, args, maxConcurrency);
} else {
results = await this.processSequential(prompts, args);
}
const totalTime = Date.now() - startTime;
const summary = this.generateSummary(results, totalTime);
return this.createSuccessResponse(summary);
} catch (error) {
return this.createErrorResponse(`Batch processing failed: ${error instanceof Error ? error.message : 'Unknown error'}`);
}
}
private async processParallel(
prompts: string[],
args: z.infer<typeof BatchProcessorSchema>,
maxConcurrency: number
): Promise<BatchResult[]> {
const results: BatchResult[] = [];
const semaphore = new Semaphore(maxConcurrency);
const promises = prompts.map(async (prompt, index) => {
await semaphore.acquire();
try {
const result = await this.processSinglePrompt(prompt, index, args);
results[index] = result;
} finally {
semaphore.release();
}
});
await Promise.all(promises);
return results;
}
private async processSequential(
prompts: string[],
args: z.infer<typeof BatchProcessorSchema>
): Promise<BatchResult[]> {
const results: BatchResult[] = [];
for (let i = 0; i < prompts.length; i++) {
const result = await this.processSinglePrompt(prompts[i], i, args);
results.push(result);
}
return results;
}
private async processSinglePrompt(
prompt: string,
index: number,
args: z.infer<typeof BatchProcessorSchema>
): Promise<BatchResult> {
const startTime = Date.now();
try {
const systemPrompt = args.taskType
? this.promptOptimizer.optimizeSystemPrompt({
taskType: args.taskType,
optimizationLevel: 'balanced'
})
: undefined;
const request: OpenAIRequest = {
prompt,
systemPrompt,
model: args.model,
temperature: args.temperature,
maxTokens: args.maxTokens
};
const response = await this.openaiClient.chat(request);
const processingTime = Date.now() - startTime;
return {
index,
prompt,
response: response.content,
success: true,
tokens: response.usage?.totalTokens,
processingTime
};
} catch (error) {
const processingTime = Date.now() - startTime;
return {
index,
prompt,
response: '',
success: false,
error: error instanceof Error ? error.message : 'Unknown error',
processingTime
};
}
}
private generateSummary(results: BatchResult[], totalTime: number): string {
const successful = results.filter(r => r.success).length;
const failed = results.length - successful;
const totalTokens = results.reduce((sum, r) => sum + (r.tokens || 0), 0);
const avgProcessingTime = results.reduce((sum, r) => sum + r.processingTime, 0) / results.length;
let summary = `🔄 **Batch Processing Complete**\n\n`;
summary += `📊 **Summary**:\n`;
summary += `- Total prompts: ${results.length}\n`;
summary += `- Successful: ${successful}\n`;
summary += `- Failed: ${failed}\n`;
summary += `- Total tokens used: ${totalTokens.toLocaleString()}\n`;
summary += `- Total time: ${(totalTime / 1000).toFixed(2)}s\n`;
summary += `- Average per prompt: ${(avgProcessingTime / 1000).toFixed(2)}s\n\n`;
if (failed > 0) {
summary += `❌ **Failed Requests**:\n`;
results
.filter(r => !r.success)
.forEach(r => {
summary += `- Prompt ${r.index + 1}: ${r.error}\n`;
});
summary += `\n`;
}
summary += `✅ **Results**:\n\n`;
results
.filter(r => r.success)
.forEach(r => {
const truncatedPrompt = r.prompt.length > 50
? r.prompt.substring(0, 50) + '...'
: r.prompt;
const truncatedResponse = r.response.length > 100
? r.response.substring(0, 100) + '...'
: r.response;
summary += `**Prompt ${r.index + 1}**: "${truncatedPrompt}"\n`;
summary += `**Response**: ${truncatedResponse}\n`;
summary += `**Tokens**: ${r.tokens || 'N/A'} | **Time**: ${(r.processingTime / 1000).toFixed(2)}s\n\n`;
});
// Cost estimation
const estimatedCost = (totalTokens * 0.03 / 1000).toFixed(4);
summary += `💰 **Estimated Cost**: $${estimatedCost} (input tokens only)`;
return summary;
}
}
// Simple semaphore implementation for concurrency control
class Semaphore {
private permits: number;
private waitQueue: Array<() => void> = [];
constructor(permits: number) {
this.permits = permits;
}
async acquire(): Promise<void> {
return new Promise((resolve) => {
if (this.permits > 0) {
this.permits--;
resolve();
} else {
this.waitQueue.push(resolve);
}
});
}
release(): void {
this.permits++;
if (this.waitQueue.length > 0) {
const next = this.waitQueue.shift();
if (next) {
this.permits--;
next();
}
}
}
}