"""小说处理模块 - 提供小说分割和组合功能"""
import re
from pathlib import Path
from typing import List, Dict, Any
from datetime import datetime
from .utils import (
calculate_tokens,
normalize_chinese_punctuation,
split_by_sentence_endings,
filter_meaningless,
group_by_max_chars,
count_words,
extract_chapter_number,
)
class NovelProcessor:
"""小说处理器"""
@staticmethod
def split_novel(content: str, max_chars: int) -> List[str]:
"""
分割小说内容
Args:
content: 小说内容
max_chars: 最大字符数
Returns:
分割后的段落数组
"""
# 1. 统一标点符号(中文转英文)
normalized_text = normalize_chinese_punctuation(content)
# 2. 按句子结束符分割
sentences = split_by_sentence_endings(normalized_text)
# 3. 过滤无意义内容
filtered_sentences = filter_meaningless(sentences)
# 4. 根据最大字符数合并
paragraphs = group_by_max_chars(filtered_sentences, max_chars)
return paragraphs
@staticmethod
def split_novel_with_metadata(
content: str,
max_chars: int,
source_file: str = None
) -> Dict[str, Any]:
"""
分割小说并返回详细元数据
Args:
content: 小说内容
max_chars: 最大字符数
source_file: 源文件名
Returns:
包含元数据和段落的字典
"""
paragraphs = NovelProcessor.split_novel(content, max_chars)
# 计算 token 信息
total_tokens = 0
paragraphs_with_tokens = []
for index, para in enumerate(paragraphs, 1):
tokens = calculate_tokens(para)
total_tokens += tokens
paragraphs_with_tokens.append({
"index": index,
"length": len(para),
"tokens": tokens,
"content": para,
})
return {
"metadata": {
"sourceFile": source_file,
"maxChars": max_chars,
"totalParagraphs": len(paragraphs),
"totalTokens": total_tokens,
"averageTokensPerParagraph": round(total_tokens / len(paragraphs)) if paragraphs else 0,
"tokenCalculationMethod": "tiktoken (cl100k_base)",
"processedAt": datetime.now().isoformat(),
},
"paragraphs": paragraphs_with_tokens,
}
@staticmethod
def save_split_files(
paragraphs: List[Dict[str, Any]],
output_dir: Path,
base_name: str
) -> int:
"""
将分割的段落保存为单独的文件
Args:
paragraphs: 段落列表(带元数据)
output_dir: 输出目录
base_name: 基础文件名
Returns:
保存的文件数量
"""
# 如果目录已存在,先删除
if output_dir.exists():
import shutil
shutil.rmtree(output_dir)
# 创建目录
output_dir.mkdir(parents=True, exist_ok=True)
# 保存每个段落
for para in paragraphs:
file_name = f"{para['index']}-{para['tokens']}.txt"
file_path = output_dir / file_name
file_path.write_text(para['content'], encoding='utf-8')
return len(paragraphs)
@staticmethod
def combine_novels(input_dir: Path) -> Dict[str, Any]:
"""
组合改写文件
Args:
input_dir: 输入目录
Returns:
组合结果信息
"""
# 查找所有改写文件(-rewrite.txt 后缀)
rewritten_files = []
for file_path in input_dir.glob("*-rewrite.txt"):
chapter_num = extract_chapter_number(file_path.stem)
rewritten_files.append({
"chapter": chapter_num,
"path": file_path,
"name": file_path.name,
})
# 按章节号排序
rewritten_files.sort(key=lambda x: x['chapter'])
if not rewritten_files:
return {
"success": False,
"error": "未找到任何改写文件",
"filesFound": 0,
}
# 组合内容
total_tokens = 0
output_lines = []
# 添加文件头
dir_name = input_dir.name
output_lines.append(f"# {dir_name}\n")
for item in rewritten_files:
# 读取章节内容
content = item['path'].read_text(encoding='utf-8')
if not content.strip():
continue
# 计算 token 数量
chapter_tokens = count_words(content)
total_tokens += chapter_tokens
# 添加章节标题和内容
output_lines.append(
f"## 第{item['chapter']}章 (字数: {chapter_tokens})\n")
output_lines.append(content.strip())
output_lines.append("\n\n---\n")
combined_content = "\n".join(output_lines)
return {
"success": True,
"content": combined_content,
"filesFound": len(rewritten_files),
"totalWords": total_tokens,
"chapters": [item['chapter'] for item in rewritten_files],
}