llm-adapter.ts•19.8 kB
/**
* LLM Adapter for Claude Code integration
* Handles process pool management, retry logic, and Claude CLI execution
*/
import { spawn, ChildProcess } from 'child_process';
import { EventEmitter } from 'events';
import {
ExecutionTrajectory,
ReflectionAnalysis,
PromptImprovement,
LLMResponse,
} from '../types/gepa';
import { MemoryLeakIntegration } from '../core/memory-leak-detector';
import {
executeLLMWithResilience
} from '../core/resilience/index';
/**
* Configuration options for the LLM adapter
*/
export interface LLMAdapterConfig {
maxConcurrentProcesses?: number;
processTimeout?: number;
maxRetries?: number;
retryBaseDelay?: number;
executable?: string;
workingDir?: string;
env?: Record<string, string>;
}
/**
* Response from Claude Code CLI
*/
export interface ClaudeResponse {
response: string;
usage?: {
inputTokens: number;
outputTokens: number;
};
processingTime: number;
}
/**
* Request in the process pool queue
*/
interface QueuedRequest {
prompt: string;
resolve: (value: ClaudeResponse) => void;
reject: (error: Error) => void;
timestamp: number;
}
/**
* Active process information
*/
interface ProcessInfo {
process: ChildProcess;
startTime: number;
timeout: ReturnType<typeof setTimeout>;
handled: boolean;
}
/**
* LLM Adapter for Claude Code integration
* Manages process pool, retries, and Claude CLI execution
*/
export class LLMAdapter extends EventEmitter {
private readonly maxConcurrentProcesses: number;
private readonly processTimeout: number;
private readonly maxRetries: number;
private readonly retryBaseDelay: number;
private readonly executable: string;
private readonly workingDir: string;
private readonly env: Record<string, string>;
private activeProcesses = new Set<ProcessInfo>();
private requestQueue: QueuedRequest[] = [];
private isShuttingDown = false;
constructor(config: LLMAdapterConfig = {}) {
super();
this.maxConcurrentProcesses = config.maxConcurrentProcesses ?? 3;
this.processTimeout = config.processTimeout ?? 300000; // 5 minutes
this.maxRetries = config.maxRetries ?? 3;
this.retryBaseDelay = config.retryBaseDelay ?? 1000;
this.executable = config.executable ?? 'claude';
this.workingDir = config.workingDir ?? process.cwd();
this.env = { ...process.env, ...config.env } as Record<string, string>;
// Initialize circuit breaker for resilience (removed unused variable)
this.setupMemoryLeakDetection();
}
/**
* Generate response using Claude Code CLI with resilience protection
*/
async generateResponse(prompt: string): Promise<ClaudeResponse> {
if (this.isShuttingDown) {
throw new Error('Adapter is shutting down');
}
// Use full resilience protection for LLM operations
return await executeLLMWithResilience(
async () => {
return this.executeWithRetry(prompt);
},
{
context: {
name: 'llm-generate-response',
priority: 'high'
}
}
);
}
/**
* Execute request with retry logic and exponential backoff
*/
private async executeWithRetry(prompt: string): Promise<ClaudeResponse> {
let lastError: Error | null = null;
for (let attempt = 1; attempt <= this.maxRetries; attempt++) {
// Continue with retry logic
try {
return await this.executeRequest(prompt);
} catch (error) {
lastError = error as Error;
// Don't retry certain errors (spawn failures, timeout, etc.)
if (lastError.message.includes('Command not found') ||
lastError.message.includes('ENOENT') ||
lastError.message.includes('timeout') ||
lastError.message.includes('spawn') ||
lastError.message.includes('Empty response') ||
lastError.message.includes('Invalid JSON') ||
lastError.message.includes('Authentication failed') ||
lastError.message.includes('Request failed') ||
lastError.message.includes('Detailed error message') ||
(lastError as any).code === 'ENOENT') {
throw lastError;
}
if (attempt === this.maxRetries) {
break;
}
// Exponential backoff: 1s, 2s, 4s, etc.
const delay = this.retryBaseDelay * Math.pow(2, attempt - 1);
await this.sleep(delay);
}
}
throw new Error(`Max retries exceeded. Last error: ${lastError?.message}`);
}
/**
* Execute a single request, managing process pool
*/
private async executeRequest(prompt: string): Promise<ClaudeResponse> {
return new Promise<ClaudeResponse>((resolve, reject) => {
// Continue with request processing
const request: QueuedRequest = {
prompt,
resolve,
reject,
timestamp: Date.now(),
};
// Continue with request processing
this.requestQueue.push(request);
this.processQueue();
});
}
/**
* Process queued requests within concurrency limits
*/
private processQueue(): void {
while (
this.requestQueue.length > 0 &&
this.activeProcesses.size < this.maxConcurrentProcesses &&
!this.isShuttingDown
) {
const request = this.requestQueue.shift()!;
this.spawnProcess(request);
}
}
/**
* Spawn Claude Code process for a request
*/
private spawnProcess(request: QueuedRequest): void {
const startTime = Date.now();
try {
const childProcess = spawn(this.executable, ['--format', 'json'], {
stdio: ['pipe', 'pipe', 'pipe'],
cwd: this.workingDir,
env: this.env,
});
const processInfo: ProcessInfo = {
process: childProcess,
startTime,
handled: false,
timeout: setTimeout(() => {
if (processInfo.handled) return;
processInfo.handled = true;
this.killProcess(processInfo);
request.reject(new Error('timeout'));
}, this.processTimeout),
};
this.activeProcesses.add(processInfo);
// Track process spawn for memory leak detection
const processId = `${childProcess.pid || 'unknown'}-${Date.now()}`;
MemoryLeakIntegration.trackLLMProcess('spawn', processId, this.estimateProcessMemory());
let stdout = '';
let stderr = '';
// Handle process error (these should generally not be retried)
childProcess.on('error', (error) => {
if (processInfo.handled) return;
processInfo.handled = true;
this.cleanupProcess(processInfo);
request.reject(error);
});
// Collect stdout data
childProcess.stdout?.on('data', (data) => {
stdout += data.toString();
});
// Collect stderr data
childProcess.stderr?.on('data', (data) => {
stderr += data.toString();
});
// Handle process exit
childProcess.on('exit', (code) => {
if (processInfo.handled) return;
processInfo.handled = true;
this.cleanupProcess(processInfo);
if (code === 0) {
try {
const result = this.parseResponse(stdout, startTime);
request.resolve(result);
} catch (error) {
request.reject(error as Error);
}
} else {
const errorMessage = stderr.trim() || `Process exited with code ${code}`;
request.reject(new Error(errorMessage));
}
// Process next queued request
this.processQueue();
});
// Send prompt to stdin
if (childProcess.stdin) {
childProcess.stdin.write(request.prompt);
childProcess.stdin.end();
}
} catch (error) {
request.reject(error as Error);
this.processQueue();
}
}
/**
* Parse Claude Code JSON response
*/
private parseResponse(stdout: string, startTime: number): ClaudeResponse {
if (!stdout.trim()) {
throw new Error('Empty response from Claude Code');
}
let parsed: unknown;
try {
parsed = JSON.parse(stdout.trim());
} catch (error) {
throw new Error(`Invalid JSON response from Claude Code: ${(error as Error).message}`);
}
if (!(parsed as any).response) {
throw new Error('Response missing required "response" field');
}
const parsedResult = parsed as any;
return {
response: parsedResult.response,
usage: parsedResult.usage,
processingTime: Date.now() - startTime,
};
}
/**
* Clean up process resources
*/
private cleanupProcess(processInfo: ProcessInfo): void {
clearTimeout(processInfo.timeout);
this.activeProcesses.delete(processInfo);
// Track process cleanup for memory leak detection
const processId = `${processInfo.process.pid || 'unknown'}-${processInfo.startTime}`;
MemoryLeakIntegration.trackLLMProcess('exit', processId, this.estimateProcessMemory());
if (!processInfo.process.killed) {
processInfo.process.kill('SIGTERM');
}
}
/**
* Force kill a process
*/
private killProcess(processInfo: ProcessInfo): void {
if (!processInfo.process.killed) {
processInfo.process.kill('SIGKILL');
}
this.cleanupProcess(processInfo);
}
/**
* Sleep for specified milliseconds
*/
private sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
/**
* Shutdown adapter and kill all active processes
*/
shutdown(): void {
this.isShuttingDown = true;
// Kill all active processes
for (const processInfo of this.activeProcesses) {
this.killProcess(processInfo);
}
// Reject all queued requests
for (const request of this.requestQueue) {
request.reject(new Error('Adapter shutdown'));
}
this.requestQueue = [];
}
/**
* Call LLM with Claude Code (implements ClaudeCodeAdapter interface)
*/
async callLLM(
prompt: string,
options?: {
temperature?: number;
maxTokens?: number;
systemPrompt?: string;
}
): Promise<LLMResponse> {
// Build complete prompt with system prompt if provided
let fullPrompt = prompt;
if (options?.systemPrompt) {
fullPrompt = `System: ${options.systemPrompt}\n\nHuman: ${prompt}`;
}
const response = await this.generateResponse(fullPrompt);
return {
content: response.response,
model: 'claude-via-cli',
tokens: {
prompt: response.usage?.inputTokens ?? 0,
completion: response.usage?.outputTokens ?? 0,
total: (response.usage?.inputTokens ?? 0) + (response.usage?.outputTokens ?? 0),
},
finishReason: 'stop',
latency: response.processingTime,
timestamp: new Date(),
};
}
/**
* Analyze execution trajectory for improvements (implements ClaudeCodeAdapter interface)
*/
async analyzeTrajectory(
trajectory: ExecutionTrajectory,
targetPrompt: string
): Promise<ReflectionAnalysis> {
const analysisPrompt = `
Analyze this execution trajectory and provide improvement suggestions for the prompt.
Trajectory ID: ${trajectory.id}
Task: ${trajectory.taskId}
Success: ${trajectory.finalResult.success}
Score: ${trajectory.finalResult.score}
Execution Steps:
${trajectory.steps.map(step =>
`Step ${step.stepNumber}: ${step.action}${step.error ? ` (Error: ${step.error})` : ''}`
).join('\n')}
Current Prompt:
${targetPrompt}
Please analyze what went wrong and suggest specific improvements to the prompt.
Respond in JSON format with the following structure:
{
"trajectoryId": "${trajectory.id}",
"promptId": "${trajectory.promptId}",
"diagnosis": {
"failurePoint": "description of where it failed",
"rootCause": "root cause analysis",
"patterns": [{"type": "pattern_type", "frequency": 1, "description": "description", "examples": ["example"]}]
},
"suggestions": [
{
"type": "add_instruction",
"targetSection": "section to modify",
"proposedChange": "specific change",
"rationale": "why this helps",
"expectedImpact": 0.8
}
],
"confidence": 0.85,
"rationale": "overall analysis rationale"
}
`;
const response = await this.callLLM(analysisPrompt);
try {
const analysis = JSON.parse(response.content);
// Convert patterns array to FailurePattern objects
const patterns = analysis.diagnosis.patterns || [];
return {
trajectoryId: trajectory.id,
promptId: trajectory.promptId,
diagnosis: {
failurePoint: analysis.diagnosis.failurePoint,
rootCause: analysis.diagnosis.rootCause,
moduleResponsibility: new Map(), // Would need more complex analysis
patterns: patterns,
},
suggestions: analysis.suggestions || [],
confidence: analysis.confidence || 0.5,
rationale: analysis.rationale || 'Automated analysis',
};
} catch (error) {
throw new Error(`Failed to parse trajectory analysis: ${error}`);
}
}
/**
* Generate mutated prompt based on improvement suggestion (implements ClaudeCodeAdapter interface)
*/
async generateMutation(
basePrompt: string,
improvement: PromptImprovement
): Promise<string> {
const mutationPrompt = `
Generate an improved version of this prompt based on the specific improvement suggestion.
Original Prompt:
${basePrompt}
Improvement Suggestion:
- Type: ${improvement.type}
- Target Section: ${improvement.targetSection}
- Proposed Change: ${improvement.proposedChange}
- Rationale: ${improvement.rationale}
Please generate the improved prompt. Return only the improved prompt text, no explanations.
`;
const response = await this.callLLM(mutationPrompt);
return response.content.trim();
}
/**
* Evaluate prompt performance for evolution engine
*/
async evaluatePrompt(
prompt: string,
taskContext: any
): Promise<{ score: number; trajectory?: ExecutionTrajectory; metrics?: any }> {
try {
const evaluationPrompt = `
You are evaluating the quality of a prompt for the following task context:
Task: ${taskContext.description}
Category: ${taskContext.category}
Difficulty: ${taskContext.difficulty}
Required Capabilities: ${taskContext.requiredCapabilities.join(', ')}
Prompt to evaluate:
${prompt}
Please evaluate this prompt on a scale of 0.0 to 1.0 based on:
1. Clarity and specificity
2. Completeness of instructions
3. Appropriateness for the task
4. Likelihood to produce correct results
Respond in JSON format:
{
"score": 0.85,
"reasoning": "Clear and specific instructions with good structure",
"strengths": ["clear objectives", "good examples"],
"weaknesses": ["could be more specific about format"],
"suggestions": ["add output format specification"]
}
`;
const response = await this.callLLM(evaluationPrompt);
const evaluation = JSON.parse(response.content);
// Create a mock trajectory for compatibility
const trajectory: ExecutionTrajectory = {
id: `eval-${Date.now()}-${Math.random().toString(36).substring(7)}`,
promptId: `prompt-${Date.now()}`,
taskId: taskContext.description,
timestamp: new Date(),
steps: [
{
stepNumber: 1,
action: 'evaluate_prompt',
timestamp: new Date()
}
],
llmCalls: [
{
model: 'claude',
timestamp: new Date(),
prompt: evaluationPrompt,
response: response.content,
tokens: response.tokens || { prompt: 0, completion: 0, total: 0 },
latency: response.latency || 0
}
],
toolCalls: [],
totalTokens: (response.tokens?.total ?? 0),
executionTime: response.latency || 0,
finalResult: {
success: evaluation.score > 0.5,
score: evaluation.score,
output: evaluation.reasoning,
...(evaluation.score <= 0.5 && { error: 'Low quality prompt' })
}
};
return {
score: evaluation.score,
trajectory,
metrics: {
strengths: evaluation.strengths,
weaknesses: evaluation.weaknesses,
suggestions: evaluation.suggestions
}
};
} catch (error) {
// eslint-disable-next-line no-console
console.warn('Failed to evaluate prompt, using fallback scoring:', error);
// Fallback scoring based on prompt characteristics
const score = this.fallbackPromptScore(prompt, taskContext);
return {
score,
metrics: {
fallback: true,
error: (error as Error).message
}
};
}
}
/**
* Fallback prompt scoring when LLM evaluation fails
*/
private fallbackPromptScore(prompt: string, taskContext: any): number {
let score = 0.5; // Base score
// Length check (prompts should be substantial but not too long)
const length = prompt.length;
if (length > 100 && length < 2000) {
score += 0.1;
} else if (length > 2000) {
score -= 0.1; // Too long
} else {
score -= 0.2; // Too short
}
// Check for task-relevant keywords
const taskKeywords = taskContext.requiredCapabilities || [];
for (const keyword of taskKeywords) {
if (prompt.toLowerCase().includes(keyword.toLowerCase())) {
score += 0.05;
}
}
// Check for structural elements
if (prompt.includes('\n')) score += 0.05; // Multi-line structure
if (prompt.includes(':')) score += 0.05; // Structured format
if (prompt.includes('example') || prompt.includes('Example')) score += 0.1;
if (prompt.includes('step') || prompt.includes('Step')) score += 0.1;
// Ensure score is within bounds
return Math.max(0, Math.min(1, score));
}
/**
* Setup memory leak detection integration
*/
private setupMemoryLeakDetection(): void {
// Initialize memory leak detection if not already done
MemoryLeakIntegration.initialize();
// Monitor for excessive process accumulation
setInterval(() => {
if (this.activeProcesses.size > this.maxConcurrentProcesses * 2) {
this.emit('warning', `Excessive process count: ${this.activeProcesses.size}`);
// Force cleanup of old processes
const now = Date.now();
for (const processInfo of this.activeProcesses) {
if (now - processInfo.startTime > this.processTimeout * 2) {
this.killProcess(processInfo);
}
}
}
}, 30000); // Check every 30 seconds
}
/**
* Estimate memory usage of a process
*/
private estimateProcessMemory(): number {
// Rough estimation - each Claude process uses ~50MB
return 50 * 1024 * 1024; // 50MB
}
/**
* Perform memory cleanup specific to LLM adapter
*/
async performMemoryCleanup(): Promise<{ killedProcesses: number; freedMemory: number }> {
let killedProcesses = 0;
const now = Date.now();
// Kill processes that have been running too long
for (const processInfo of this.activeProcesses) {
if (now - processInfo.startTime > this.processTimeout) {
this.killProcess(processInfo);
killedProcesses++;
}
}
// Clear request queue if it's too large
if (this.requestQueue.length > 100) {
const removedRequests = this.requestQueue.splice(100);
for (const request of removedRequests) {
request.reject(new Error('Request queue cleanup'));
}
}
const freedMemory = killedProcesses * this.estimateProcessMemory();
return { killedProcesses, freedMemory };
}
}