/**
* Map-Reduce 总结工具
* 处理大量内容的智能总结:并行提取关键点,然后合并生成最终总结
*/
import { logger } from './logger.js';
import { SummarizerService } from '../services/summarizer.js';
export interface MapReduceOptions {
chunkSize?: number; // 每个 chunk 的最大字符数(默认 30000)
mapPrompt?: string; // Map 阶段提示词
reducePrompt?: string; // Reduce 阶段提示词
maxConcurrency?: number; // 最大并发数(默认 5)
}
interface ContentChunk {
index: number;
url: string;
content: string;
}
interface MapResult {
index: number;
url: string;
keypoints: string; // 提取的关键点
}
import { DEFAULT_MAP_PROMPT, DEFAULT_REDUCE_PROMPT } from '../prompts/map-reduce.js';
/**
* Map-Reduce 智能总结
* @param contents 网页内容数组
* @param summarizer LLM 总结服务
* @param options 选项
* @returns 最终总结
*/
export async function mapReduceSummarize(
contents: Array<{url: string; content: string}>,
summarizer: SummarizerService,
options?: MapReduceOptions
): Promise<string> {
const startTime = Date.now();
const opts = {
chunkSize: 30000,
maxConcurrency: 5,
mapPrompt: DEFAULT_MAP_PROMPT,
reducePrompt: DEFAULT_REDUCE_PROMPT,
...options
};
logger.info('Starting Map-Reduce summarization', {
contentCount: contents.length,
totalLength: contents.reduce((sum, c) => sum + c.content.length, 0),
chunkSize: opts.chunkSize,
maxConcurrency: opts.maxConcurrency
});
try {
// 1. Map 阶段:提取关键点
const mapResults = await mapPhase(contents, summarizer, opts);
// 2. Reduce 阶段:合并总结
const finalSummary = await reducePhase(mapResults, summarizer, opts);
const duration = Date.now() - startTime;
logger.info('Map-Reduce summarization completed', {
initialContentCount: contents.length,
mapResultsCount: mapResults.length,
duration: `${duration}ms`
});
return finalSummary;
} catch (error) {
const errorMessage = error instanceof Error ? error.message : 'Unknown error';
logger.error('Map-Reduce summarization failed', { error: errorMessage });
// 降级:合并所有内容进行简单总结
const combinedContent = contents
.filter(c => c.content.length > 0)
.map(c => c.content)
.join('\n\n---\n\n');
if (combinedContent.length > 0) {
logger.info('Falling back to simple summarization');
const response = await summarizer.summarize({
content: combinedContent,
prompt: DEFAULT_REDUCE_PROMPT + '\n' + combinedContent
});
return response.summary;
}
return '总结生成失败:无法处理输入内容';
}
}
/**
* Map 阶段:并行提取每个网页的关键点
*/
async function mapPhase(
contents: Array<{url: string; content: string}>,
summarizer: SummarizerService,
options: Required<MapReduceOptions>
): Promise<MapResult[]> {
// 按内容大小分块(确保每个块不超过指定大小)
const chunks: ContentChunk[] = [];
let chunkIndex = 0;
for (const item of contents) {
if (item.content.length === 0) continue;
// 如果内容太大,需要拆分
if (item.content.length > options.chunkSize) {
const subChunks = splitContent(item.content, options.chunkSize);
for (const subChunk of subChunks) {
chunks.push({
index: chunkIndex++,
url: item.url,
content: subChunk
});
}
} else {
chunks.push({
index: chunkIndex++,
url: item.url,
content: item.content
});
}
}
logger.info('Map phase: processing chunks', {
originalCount: contents.length,
chunkCount: chunks.length
});
// 并发处理
const semaphore = createSemaphore(options.maxConcurrency);
const mapPromises = chunks.map(async (chunk) => {
return withSemaphore(semaphore, async () => {
try {
const prompt = `${options.mapPrompt}\n\n网页来源:${chunk.url}\n\n${chunk.content}`;
const response = await summarizer.summarize({
content: prompt
});
return {
index: chunk.index,
url: chunk.url,
keypoints: response.summary || '无法提取关键点'
};
} catch (error) {
const errorMessage = error instanceof Error ? error.message : 'Unknown error';
logger.error('Map phase failed for chunk', {
url: chunk.url,
index: chunk.index,
error: errorMessage
});
return {
index: chunk.index,
url: chunk.url,
keypoints: '提取失败'
};
}
});
});
const results = await Promise.all(mapPromises);
// 按原始顺序排序
results.sort((a, b) => a.index - b.index);
return results;
}
/**
* Reduce 阶段:合并所有关键点生成最终总结
*/
async function reducePhase(
mapResults: MapResult[],
summarizer: SummarizerService,
options: Required<MapReduceOptions>
): Promise<string> {
// 合并所有关键点
const combinedKeypoints = mapResults
.filter(r => r.keypoints && r.keypoints !== '提取失败')
.map((r, i) => `【来源${i + 1}】${r.url}\n${r.keypoints}`)
.join('\n\n---\n\n');
if (combinedKeypoints.length === 0) {
return '无法从网页中提取有效信息进行总结';
}
logger.info('Reduce phase: combining keypoints', {
sourceCount: mapResults.length,
combinedLength: combinedKeypoints.length
});
// 生成最终总结
const prompt = `${options.reducePrompt}\n\n${combinedKeypoints}`;
const response = await summarizer.summarize({
content: prompt
});
return response.summary;
}
/**
* 拆分大内容为小块
*/
function splitContent(content: string, maxSize: number): string[] {
const chunks: string[] = [];
let currentChunk = '';
// 按段落拆分
const paragraphs = content.split(/\n\s*\n/);
for (const paragraph of paragraphs) {
// 如果单个段落就太大,强制拆分
if (paragraph.length > maxSize) {
if (currentChunk) {
chunks.push(currentChunk);
currentChunk = '';
}
// 按句子强制拆分
const sentences = paragraph.split(/[。!?.!?]/);
let tempChunk = '';
for (const sentence of sentences) {
if (tempChunk.length + sentence.length > maxSize) {
if (tempChunk) {
chunks.push(tempChunk);
tempChunk = '';
}
chunks.push(sentence.substring(0, maxSize));
} else {
tempChunk += sentence;
}
}
if (tempChunk) {
chunks.push(tempChunk);
}
} else if (currentChunk.length + paragraph.length > maxSize) {
chunks.push(currentChunk);
currentChunk = paragraph;
} else {
currentChunk += (currentChunk ? '\n\n' : '') + paragraph;
}
}
if (currentChunk) {
chunks.push(currentChunk);
}
return chunks;
}
/**
* 创建信号量控制并发
*/
function createSemaphore(maxCount: number) {
let count = 0;
const queue: (() => void)[] = [];
return {
acquire: () => new Promise<void>((resolve) => {
if (count < maxCount) {
count++;
resolve();
} else {
queue.push(resolve);
}
}),
release: () => {
count--;
if (queue.length > 0) {
const next = queue.shift()!;
count++;
next();
}
}
};
}
/**
* 使用信号量执行异步操作
*/
async function withSemaphore<T>(
semaphore: ReturnType<typeof createSemaphore>,
fn: () => Promise<T>
): Promise<T> {
await semaphore.acquire();
try {
return await fn();
} finally {
semaphore.release();
}
}