llm.tsā¢29.7 kB
import { Logger } from 'pino';
import { LLMProviderManager } from '../core/llm-provider.js';
import { LLMTransformInput, LLMTransformOutput, JSONSchema, LLMInputData } from '../types/llm.js';
import { ErrorResponse } from '../types/index.js';
import { LLMLogger } from '../utils/llm-logger.js';
import { Config } from '../utils/config.js';
import Ajv from 'ajv';
export class LLMTools {
private ajv = new (Ajv as any)();
private llmLogger: LLMLogger;
constructor(
private llmManager: LLMProviderManager,
private logger: Logger,
private config?: {
autoPreprocess?: boolean;
preprocessing?: {
enabled?: boolean;
intelligentMode?: boolean;
fallbackToTemplates?: boolean;
thresholds?: {
html?: number;
text?: number;
json?: number;
};
preferredModels?: string[];
analysis?: {
maxContentSample?: number;
maxAnalysisTokens?: number;
analysisTemperature?: number;
};
};
logging?: Config['logging'];
}
) {
this.llmLogger = new LLMLogger(this.logger, { logging: config?.logging || {} } as Config);
}
async transform(params: LLMTransformInput): Promise<LLMTransformOutput | ErrorResponse> {
const startTime = Date.now();
this.logger.info({
model: params.model,
inputKind: params.input.kind,
dataLength: params.input.data.length,
hasPreprocessRequest: !!params.preprocessRequest
}, 'Starting LLM transformation');
try {
let processedInput = params.input;
let preprocessRequest = params.preprocessRequest;
// Auto-generate preprocessing request if not provided and auto-preprocess is enabled
const preprocessingEnabled = this.config?.preprocessing?.enabled !== false && this.config?.autoPreprocess !== false;
if (!preprocessRequest && preprocessingEnabled && this.shouldAutoPreprocess(params.input, params.instruction)) {
if (this.config?.preprocessing?.intelligentMode !== false) {
try {
preprocessRequest = await this.generateIntelligentPreprocessRequest(params.input, params.instruction);
this.logger.info({ autoGeneratedPreprocess: preprocessRequest }, 'Auto-generated intelligent preprocessing request');
} catch (error) {
if (this.config?.preprocessing?.fallbackToTemplates !== false) {
preprocessRequest = this.getTemplateBasedPreprocessRequest(params.input, params.instruction);
this.logger.warn({ error }, 'Intelligent preprocessing failed, using template fallback');
} else {
this.logger.error({ error }, 'Intelligent preprocessing failed and template fallback disabled');
}
}
} else {
preprocessRequest = this.getTemplateBasedPreprocessRequest(params.input, params.instruction);
this.logger.info({ autoGeneratedPreprocess: preprocessRequest }, 'Auto-generated template-based preprocessing request');
}
}
// Preprocessing step if requested or auto-generated
if (preprocessRequest) {
this.logger.info({ isAutoGenerated: !params.preprocessRequest }, 'Running preprocessing step');
const preprocessPrompt = this.buildPreprocessPrompt(params.input, preprocessRequest);
// Use local model for preprocessing (prefer ollama or jan)
const preprocessModel = await this.getLocalModel(params.model);
// Log preprocessing request
const preprocessOperationId = this.llmLogger.logRequest(
'preprocessing',
preprocessModel,
'You are a data preprocessor. Clean and prepare the data according to the request. Return only the processed data without additional commentary.',
preprocessPrompt,
params.maxOutputTokens,
0.1,
{
inputDataLength: params.input.data.length,
inputDataType: params.input.kind,
preprocessRequest
}
);
const preprocessStartTime = Date.now();
let preprocessResponse;
try {
preprocessResponse = await this.llmManager.generate({
systemPrompt: 'You are a data preprocessor. Clean and prepare the data according to the request. Return only the processed data without additional commentary.',
userPrompt: preprocessPrompt,
maxTokens: params.maxOutputTokens,
temperature: 0.1, // Lower temperature for preprocessing
model: preprocessModel
});
// Log preprocessing response
this.llmLogger.logResponse(
preprocessOperationId,
'preprocessing',
preprocessModel,
preprocessResponse.content,
preprocessResponse.tokensUsed,
preprocessStartTime,
true
);
// Update input with preprocessed data
processedInput = {
kind: params.input.kind,
data: preprocessResponse.content.trim()
};
// Log preprocessing comparison and cost analysis
if (preprocessResponse.tokensUsed) {
const estimatedOriginalTokens = Math.ceil(params.input.data.length / 4); // Rough estimate
const costSaved = this.estimateTokenCostSavings(params.model, estimatedOriginalTokens - preprocessResponse.tokensUsed.total);
this.llmLogger.logPreprocessingComparison(
preprocessOperationId,
params.input.data,
processedInput.data,
preprocessRequest,
estimatedOriginalTokens,
preprocessResponse.tokensUsed.total,
costSaved
);
}
this.logger.info({
originalLength: params.input.data.length,
processedLength: processedInput.data.length,
preprocessRequest: preprocessRequest
}, 'Preprocessing completed');
} catch (error) {
// Log preprocessing error
this.llmLogger.logResponse(
preprocessOperationId,
'preprocessing',
preprocessModel,
'',
undefined,
preprocessStartTime,
false,
error instanceof Error ? error.message : String(error)
);
throw error;
}
}
// Prepare the user prompt with processed input
const userPrompt = this.buildUserPrompt({ ...params, input: processedInput });
// Log main processing request
const mainOperationId = this.llmLogger.logRequest(
'main',
params.model,
params.systemPrompt,
userPrompt,
params.maxOutputTokens,
params.temperature,
{
inputDataLength: processedInput.data.length,
inputDataType: processedInput.kind,
instruction: params.instruction,
preprocessRequest: preprocessRequest
}
);
const mainStartTime = Date.now();
let response;
try {
// Call LLM for main processing
response = await this.llmManager.generate({
systemPrompt: params.systemPrompt,
userPrompt,
maxTokens: params.maxOutputTokens,
temperature: params.temperature,
model: params.model
});
// Log main processing response
this.llmLogger.logResponse(
mainOperationId,
'main',
params.model,
response.content,
response.tokensUsed,
mainStartTime,
true
);
} catch (error) {
// Log main processing error
this.llmLogger.logResponse(
mainOperationId,
'main',
params.model,
'',
undefined,
mainStartTime,
false,
error instanceof Error ? error.message : String(error)
);
throw error;
}
// Parse and validate result
const result = await this.parseAndValidateResult(response.content, params.schema);
const duration = Date.now() - startTime;
this.logger.info({
model: params.model,
tokensUsed: response.tokensUsed,
duration
}, 'LLM transformation completed');
// Log final performance summary periodically
if (this.llmLogger.getMetrics().totalOperations % 10 === 0) {
this.llmLogger.logPerformanceSummary();
}
return {
status: 'ok',
result,
raw: response.content,
model: params.model,
tokensUsed: response.tokensUsed
};
} catch (error) {
const duration = Date.now() - startTime;
this.logger.error({
model: params.model,
error: error instanceof Error ? error.message : String(error),
duration
}, 'LLM transformation failed');
return {
status: 'error',
code: 'llm_failed',
message: error instanceof Error ? error.message : 'Unknown error during LLM transformation',
details: {
model: params.model,
inputKind: params.input.kind
}
};
}
}
private buildUserPrompt(params: LLMTransformInput): string {
let prompt = `Task: ${params.instruction}\n\n`;
// Add input data with context
switch (params.input.kind) {
case 'text':
prompt += `Text content:\n${params.input.data}`;
break;
case 'html':
prompt += `HTML content:\n${params.input.data}`;
break;
case 'json':
prompt += `JSON data:\n${params.input.data}`;
break;
}
// Add schema requirements if provided
if (params.schema) {
prompt += `\n\nRequired output format (JSON Schema):\n${JSON.stringify(params.schema, null, 2)}`;
prompt += `\n\nPlease return ONLY valid JSON that conforms to the above schema.`;
} else {
prompt += `\n\nPlease return the result as valid JSON.`;
}
return prompt;
}
private async parseAndValidateResult(content: string, schema?: JSONSchema): Promise<any> {
// Try to extract JSON from the response
let jsonStr = content.trim();
// If content is wrapped in markdown code blocks, extract the JSON
const jsonMatch = jsonStr.match(/```(?:json)?\s*([\s\S]*?)\s*```/);
if (jsonMatch) {
jsonStr = jsonMatch[1].trim();
}
// Parse JSON
let result: any;
try {
result = JSON.parse(jsonStr);
} catch (parseError) {
// Try to find JSON-like content in the response
const potentialJson = this.extractPotentialJson(jsonStr);
if (potentialJson) {
try {
result = JSON.parse(potentialJson);
} catch (secondParseError) {
throw new Error(`Invalid JSON response: ${parseError instanceof Error ? parseError.message : 'Parse error'}`);
}
} else {
throw new Error(`No valid JSON found in response: ${parseError instanceof Error ? parseError.message : 'Parse error'}`);
}
}
// Validate against schema if provided
if (schema) {
const validate = this.ajv.compile(schema);
const valid = validate(result);
if (!valid) {
const errors = validate.errors?.map((err: any) => `${err.instancePath}: ${err.message}`).join(', ') || 'Unknown validation error';
throw new Error(`Schema validation failed: ${errors}`);
}
}
return result;
}
private extractPotentialJson(text: string): string | null {
// Look for JSON objects or arrays in the text
const patterns = [
/\{[\s\S]*\}/, // Object
/\[[\s\S]*\]/ // Array
];
for (const pattern of patterns) {
const match = text.match(pattern);
if (match) {
return match[0];
}
}
return null;
}
private buildPreprocessPrompt(input: LLMInputData, preprocessRequest: string): string {
let prompt = `Preprocessing request: ${preprocessRequest}\n\n`;
switch (input.kind) {
case 'text':
prompt += `Text content to preprocess:\n${input.data}`;
break;
case 'html':
prompt += `HTML content to preprocess:\n${input.data}`;
break;
case 'json':
prompt += `JSON data to preprocess:\n${input.data}`;
break;
}
prompt += `\n\nPlease preprocess the above ${input.kind} according to the request and return only the processed data.`;
return prompt;
}
private async getLocalModel(currentModel: string): Promise<string> {
// If already using a local model, keep it
if (currentModel.startsWith('ollama:') || currentModel.startsWith('jan:')) {
return currentModel;
}
// Get preferred models from config, with fallback to defaults
const preferredModels = this.config?.preprocessing?.preferredModels || [
'ollama:qwen2.5:7b',
'ollama:llama3.2:3b',
'ollama:mistral:7b',
'ollama:llama3.1:8b',
'ollama:llama3.1',
'jan:llama-3.2-3b',
'jan:llama-3.1-8b',
'jan:mistral-7b'
];
// Try preferred models in order
for (const model of preferredModels) {
if (await this.isModelAvailable(model)) {
const modelType = model.startsWith('ollama:') ? 'Ollama' : model.startsWith('jan:') ? 'JAN' : 'Unknown';
this.logger.debug({ model, modelType }, `Selected ${modelType} model for preprocessing`);
return model;
}
}
// Final fallback
this.logger.warn('No local model available, using original model for preprocessing');
return currentModel;
}
private async isModelAvailable(model: string): Promise<boolean> {
try {
// For now, we'll just return true for the first model in the list
// In production, this could check if the model is actually available
const provider = this.llmManager.getProvider(model);
return true;
} catch {
return false;
}
}
private shouldAutoPreprocess(input: LLMInputData, instruction: string): boolean {
const instructionLower = instruction.toLowerCase();
// Get thresholds from config
const thresholds = this.config?.preprocessing?.thresholds || {
html: 3000,
text: 5000,
json: 1000
};
// Always preprocess for very large content (saves tokens and improves quality)
if (input.data.length > 10000) {
return true;
}
// HTML-specific preprocessing logic
if (input.kind === 'html') {
// Always preprocess HTML over configured threshold
if (input.data.length > (thresholds.html || 3000)) return true;
// Preprocess if HTML contains typical noise indicators
const htmlNoise = ['<script', '<style', 'navigation', 'navbar', 'sidebar', 'footer', 'advertisement'];
if (htmlNoise.some(noise => input.data.toLowerCase().includes(noise))) {
return true;
}
// Preprocess for specific extraction tasks
const extractionTasks = ['table', 'product', 'article', 'content', 'data'];
if (extractionTasks.some(task => instructionLower.includes(task))) {
return true;
}
}
// Text-specific preprocessing logic
if (input.kind === 'text') {
// Preprocess text documents over configured threshold
if (input.data.length > (thresholds.text || 5000)) return true;
// Preprocess if text has formatting issues
const hasFormattingIssues = /\s{3,}|\n{3,}|\t{2,}/.test(input.data); // Multiple spaces/newlines/tabs
if (hasFormattingIssues) return true;
// Preprocess for structured extraction tasks
const structuredTasks = ['extract', 'parse', 'summary', 'analyze', 'organize'];
if (structuredTasks.some(task => instructionLower.includes(task))) {
return true;
}
}
// JSON-specific preprocessing logic
if (input.kind === 'json') {
try {
const parsed = JSON.parse(input.data);
// Preprocess large or complex JSON structures
if (Array.isArray(parsed)) {
if (parsed.length > 20) return true;
// Check for inconsistent array items
if (parsed.length > 5 && this.hasInconsistentJsonStructure(parsed)) return true;
} else if (typeof parsed === 'object' && parsed !== null) {
if (Object.keys(parsed).length > 30) return true;
// Check for nested complexity
if (this.hasDeepNesting(parsed, 3)) return true;
}
} catch {
// Malformed JSON definitely needs preprocessing
return true;
}
// Preprocess for data cleaning tasks
const dataTasks = ['clean', 'standardize', 'normalize', 'format', 'validate'];
if (dataTasks.some(task => instructionLower.includes(task))) {
return true;
}
}
// General preprocessing triggers
const cleaningKeywords = [
'clean', 'extract', 'parse', 'standardize', 'normalize', 'filter', 'remove',
'organize', 'structure', 'format', 'process', 'transform', 'prepare'
];
if (cleaningKeywords.some(keyword => instructionLower.includes(keyword))) {
return true;
}
// Preprocess if instruction mentions specific content types that often need cleaning
const noisyContentTypes = ['webpage', 'html', 'scraped', 'raw', 'unstructured', 'messy'];
if (noisyContentTypes.some(type => instructionLower.includes(type))) {
return true;
}
return false;
}
/**
* Checks if JSON array has inconsistent structure between items
*/
private hasInconsistentJsonStructure(array: any[]): boolean {
if (array.length < 2) return false;
const firstItemKeys = Object.keys(array[0] || {}).sort();
for (let i = 1; i < Math.min(array.length, 5); i++) { // Check first 5 items
const currentKeys = Object.keys(array[i] || {}).sort();
if (JSON.stringify(firstItemKeys) !== JSON.stringify(currentKeys)) {
return true;
}
}
return false;
}
/**
* Checks if object has deep nesting beyond specified depth
*/
private hasDeepNesting(obj: any, maxDepth: number, currentDepth = 0): boolean {
if (currentDepth >= maxDepth) return true;
if (typeof obj === 'object' && obj !== null) {
for (const value of Object.values(obj)) {
if (typeof value === 'object' && value !== null) {
if (this.hasDeepNesting(value, maxDepth, currentDepth + 1)) {
return true;
}
}
}
}
return false;
}
/**
* Generates intelligent preprocessing requests using local LLM analysis
*/
private async generateIntelligentPreprocessRequest(input: LLMInputData, instruction: string): Promise<string> {
try {
// First, try to get a quick analysis from local model
const localModel = await this.getLocalModel('ollama:llama3.1');
// Create analysis prompt
const analysisPrompt = this.createContentAnalysisPrompt(input, instruction);
// Get analysis settings from config
const analysisSettings = {
maxAnalysisTokens: this.config?.preprocessing?.analysis?.maxAnalysisTokens || 300,
analysisTemperature: this.config?.preprocessing?.analysis?.analysisTemperature || 0.1
};
// Log analysis request
const analysisOperationId = this.llmLogger.logRequest(
'analysis',
localModel,
'You are an expert content analyzer. Analyze the provided content and main task to suggest the most effective preprocessing steps. Be concise and specific.',
analysisPrompt,
analysisSettings.maxAnalysisTokens,
analysisSettings.analysisTemperature,
{
inputDataLength: input.data.length,
inputDataType: input.kind,
instruction
}
);
const analysisStartTime = Date.now();
let analysisResponse;
try {
// Use local model to analyze content and suggest preprocessing
analysisResponse = await this.llmManager.generate({
systemPrompt: 'You are an expert content analyzer. Analyze the provided content and main task to suggest the most effective preprocessing steps. Be concise and specific.',
userPrompt: analysisPrompt,
maxTokens: analysisSettings.maxAnalysisTokens,
temperature: analysisSettings.analysisTemperature,
model: localModel
});
// Log analysis response
this.llmLogger.logResponse(
analysisOperationId,
'analysis',
localModel,
analysisResponse.content,
analysisResponse.tokensUsed,
analysisStartTime,
true
);
} catch (error) {
// Log analysis error
this.llmLogger.logResponse(
analysisOperationId,
'analysis',
localModel,
'',
undefined,
analysisStartTime,
false,
error instanceof Error ? error.message : String(error)
);
throw error;
}
// Parse the analysis response to create preprocessing request
let preprocessRequest = this.parseAnalysisToPreprocessRequest(analysisResponse.content, input.kind);
// If analysis fails, fallback to template-based generation
if (!preprocessRequest || preprocessRequest.length < 10) {
preprocessRequest = this.getTemplateBasedPreprocessRequest(input, instruction);
}
this.logger.debug({
analysisUsed: true,
analysisResponse: analysisResponse.content,
finalPreprocessRequest: preprocessRequest
}, 'Generated intelligent preprocessing request');
// Log preprocessing analysis details
this.llmLogger.logPreprocessingAnalysis(
input.data,
instruction,
analysisResponse.content,
preprocessRequest,
input.data.length,
input.data.length, // Will be updated after actual preprocessing
analysisStartTime,
0 // Will be updated after actual preprocessing
);
return preprocessRequest;
} catch (error) {
this.logger.warn({ error: error instanceof Error ? error.message : String(error) },
'Failed to generate intelligent preprocessing request, falling back to template-based');
// Fallback to template-based generation
return this.getTemplateBasedPreprocessRequest(input, instruction);
}
}
/**
* Creates content analysis prompt for intelligent preprocessing
*/
private createContentAnalysisPrompt(input: LLMInputData, instruction: string): string {
const maxSample = this.config?.preprocessing?.analysis?.maxContentSample || 1000;
const contentSample = this.getContentSample(input.data, maxSample);
return `MAIN TASK: ${instruction}
CONTENT TYPE: ${input.kind}
CONTENT SAMPLE: ${contentSample}
CONTENT LENGTH: ${input.data.length} characters
Analyze this content and main task. What preprocessing steps would be most effective to:
1. Remove noise and irrelevant information
2. Prepare clean, focused data for the main task
3. Optimize for better LLM processing
Suggest specific preprocessing actions in 1-2 sentences.`;
}
/**
* Parses LLM analysis response into actionable preprocessing request
*/
private parseAnalysisToPreprocessRequest(analysisContent: string, dataKind: string): string {
const analysis = analysisContent.toLowerCase();
let request = '';
// Base cleanup based on data type
if (dataKind === 'html') {
request = 'Remove HTML noise: scripts, styles, navigation, ads, footers. ';
} else if (dataKind === 'text') {
request = 'Clean text: normalize whitespace, fix formatting issues. ';
} else if (dataKind === 'json') {
request = 'Clean JSON: remove null/empty values, standardize formats. ';
}
// Parse specific suggestions from analysis
if (analysis.includes('table') || analysis.includes('tabular')) {
request += 'Preserve table structures and data organization. ';
}
if (analysis.includes('product') || analysis.includes('item') || analysis.includes('catalog')) {
request += 'Focus on product/item information, remove marketing fluff. ';
}
if (analysis.includes('article') || analysis.includes('content') || analysis.includes('text')) {
request += 'Extract main content, remove sidebars and distractions. ';
}
if (analysis.includes('data') || analysis.includes('extract') || analysis.includes('parse')) {
request += 'Structure data clearly, ensure consistent formatting. ';
}
if (analysis.includes('date') || analysis.includes('time')) {
request += 'Standardize date/time formats. ';
}
if (analysis.includes('number') || analysis.includes('currency') || analysis.includes('price')) {
request += 'Normalize numeric values and currency formats. ';
}
request += 'Keep only content relevant to the main task.';
return request.trim();
}
/**
* Fallback template-based preprocessing request generation
*/
private getTemplateBasedPreprocessRequest(input: LLMInputData, instruction: string): string {
const instructionLower = instruction.toLowerCase();
const templates = this.getPreprocessingTemplates();
// Choose template based on content type and instruction keywords
let template = templates.default;
if (input.kind === 'html') {
if (instructionLower.includes('table')) template = templates.htmlTable;
else if (instructionLower.includes('product')) template = templates.htmlProduct;
else if (instructionLower.includes('article')) template = templates.htmlArticle;
else template = templates.htmlGeneral;
} else if (input.kind === 'text') {
if (instructionLower.includes('summary')) template = templates.textSummary;
else if (instructionLower.includes('extract')) template = templates.textExtract;
else template = templates.textGeneral;
} else if (input.kind === 'json') {
if (instructionLower.includes('table')) template = templates.jsonTable;
else if (instructionLower.includes('date')) template = templates.jsonDate;
else template = templates.jsonGeneral;
}
return template;
}
/**
* Returns preprocessing templates for different scenarios
*/
private getPreprocessingTemplates(): Record<string, string> {
return {
default: 'Clean and prepare the data for better processing by removing noise and standardizing format.',
htmlGeneral: 'Remove HTML noise: scripts, styles, navigation, ads, headers, footers. Keep only main content relevant to the task.',
htmlTable: 'Remove HTML noise but preserve all table structures, headers, and data. Clean table formatting for better parsing.',
htmlProduct: 'Remove navigation, ads, reviews section, related products. Focus on main product information: name, price, description, specifications.',
htmlArticle: 'Remove navigation, sidebars, ads, comments. Extract main article title, content, and relevant metadata.',
textGeneral: 'Clean text: normalize whitespace, fix encoding issues, remove duplicate content. Organize information logically.',
textSummary: 'Clean text and organize into clear sections. Remove redundant information and focus on key points for summarization.',
textExtract: 'Clean text and identify structured information. Prepare for data extraction by organizing content clearly.',
jsonGeneral: 'Clean JSON: remove null/empty values, standardize field names, ensure consistent data types and structure.',
jsonTable: 'Clean JSON table data: remove empty rows, standardize column names, ensure consistent data types, merge duplicates.',
jsonDate: 'Clean JSON and standardize all date/time formats to ISO 8601 (YYYY-MM-DD or YYYY-MM-DDTHH:mm:ss). Fix date parsing issues.'
};
}
/**
* Gets a sample of content for analysis (to avoid token limits)
*/
private getContentSample(content: string, maxLength: number): string {
if (content.length <= maxLength) {
return content;
}
// Take beginning and end samples
const halfLength = Math.floor(maxLength / 2);
const beginning = content.substring(0, halfLength);
const ending = content.substring(content.length - halfLength);
return `${beginning}\n... [content truncated] ...\n${ending}`;
}
/**
* Estimates token cost savings from preprocessing
*/
private estimateTokenCostSavings(model: string, tokensSaved: number): number {
// Rough cost estimates per 1K tokens (as of 2024)
const costs: Record<string, { input: number; output: number }> = {
'gpt-4o': { input: 0.005, output: 0.015 },
'gpt-4o-mini': { input: 0.00015, output: 0.0006 },
'gpt-4': { input: 0.03, output: 0.06 },
'gpt-3.5-turbo': { input: 0.0015, output: 0.002 },
'claude-3-5-sonnet': { input: 0.003, output: 0.015 },
'claude-3-haiku': { input: 0.00025, output: 0.00125 },
'default': { input: 0.002, output: 0.002 } // Default for local models
};
const modelKey = Object.keys(costs).find(key => model.includes(key)) || 'default';
const cost = costs[modelKey];
// Assume saved tokens are mostly input tokens
const inputCostSaved = (tokensSaved / 1000) * cost.input;
return inputCostSaved;
}
/**
* Gets current LLM performance metrics
*/
getLLMMetrics() {
return this.llmLogger.getMetrics();
}
/**
* Logs final performance summary (useful for cleanup/shutdown)
*/
logFinalPerformanceSummary() {
this.llmLogger.logPerformanceSummary();
}
}