"""任务管理模块 - 处理小说改写任务的分发和状态管理"""
from config import MIN_WORD_RATIO, FILE_LOCK_TIMEOUT
import json
import sys
from pathlib import Path
from typing import List, Dict, Any, Optional
from datetime import datetime, timedelta
from .utils import count_words, extract_chapter_number
from .file_lock import file_lock
from .workspace_manager import workspace_manager
from .logger import get_logger
# 导入配置
sys.path.insert(0, str(Path(__file__).parent.parent))
# 获取日志记录器
logger = get_logger(__name__)
class TaskManager:
"""任务管理器"""
def __init__(self):
"""初始化任务管理器,使用工作目录管理器"""
pass
@property
def work_dir(self) -> Path:
"""获取当前工作目录"""
return workspace_manager.get_work_dir()
def get_project_tasks_file(self, project_name: str) -> Path:
"""获取项目的任务文件路径"""
return workspace_manager.get_output_dir() / project_name / "tasks.json"
def get_project_dir(self, project_name: str) -> Path:
"""获取项目目录"""
return workspace_manager.get_output_dir() / project_name
def load_tasks(self, project_name: str) -> Dict[str, Any]:
"""加载任务配置(支持并发读取)"""
tasks_file = self.get_project_tasks_file(project_name)
if not tasks_file.exists():
return {
"tasks": [],
"metadata": {
"total": 0,
"pending": 0,
"running": 0,
"completed": 0,
"failed": 0,
"ignored": 0
}
}
# 使用共享锁(读锁)进行并发读取
with file_lock(tasks_file, exclusive=False, timeout=FILE_LOCK_TIMEOUT):
with open(tasks_file, "r", encoding="utf-8") as f:
return json.load(f)
def save_tasks(self, project_name: str, data: Dict[str, Any]):
"""保存任务配置(支持并发写入)"""
tasks_file = self.get_project_tasks_file(project_name)
tasks_file.parent.mkdir(parents=True, exist_ok=True)
# 使用独占锁(写锁)进行并发写入保护
with file_lock(tasks_file, exclusive=True, timeout=FILE_LOCK_TIMEOUT):
with open(tasks_file, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
def update_metadata(self, data: Dict[str, Any]):
"""更新元数据统计"""
tasks = data["tasks"]
# 保留现有的 prompt_file 配置
existing_prompt_file = data.get("metadata", {}).get("prompt_file")
data["metadata"] = {
"total": len(tasks),
"pending": sum(1 for t in tasks if t["status"] == "pending"),
"running": sum(1 for t in tasks if t["status"] == "running"),
"completed": sum(1 for t in tasks if t["status"] == "completed"),
"failed": sum(1 for t in tasks if t["status"] == "failed"),
"ignored": sum(1 for t in tasks if t["status"] == "ignored")
}
# 保留提示词配置
if existing_prompt_file:
data["metadata"]["prompt_file"] = existing_prompt_file
def init_tasks(
self,
project_name: str,
source_dir: Path,
force: bool = False,
prompt_file: str = None
) -> Dict[str, Any]:
"""
初始化任务列表
Args:
project_name: 项目名称
source_dir: 源文件目录(包含分割后的文件)
force: 是否强制重新初始化
prompt_file: 提示词文件名(可选,不指定则使用默认提示词)
Returns:
初始化结果
"""
data = self.load_tasks(project_name)
if data["tasks"] and not force:
return {
"success": False,
"message": f"任务列表已存在({len(data['tasks'])}个)。设置force=true重新初始化",
"count": len(data['tasks'])
}
if not source_dir.exists():
return {
"success": False,
"message": f"错误: 源文件夹不存在 {source_dir}",
}
# 获取项目目录
project_dir = self.get_project_dir(project_name)
source_folder = project_dir / "source"
rewrite_folder = project_dir / "rewrite"
# 创建目录结构
source_folder.mkdir(parents=True, exist_ok=True)
rewrite_folder.mkdir(parents=True, exist_ok=True)
# 移动源文件到 source 文件夹
import shutil
if source_dir != source_folder:
for file in source_dir.glob("*.txt"):
shutil.move(str(file), str(source_folder / file.name))
# 扫描文件并排序
txt_files = sorted(
source_folder.glob("*.txt"),
key=lambda p: extract_chapter_number(p.name)
)
# 获取默认提示词文件名(如果未指定)
from config import DEFAULT_PROMPT_FILE_NAME
if prompt_file is None:
prompt_file = DEFAULT_PROMPT_FILE_NAME
tasks = []
for idx, file_path in enumerate(txt_files, 1):
# 读取文件内容以计算字数
content = file_path.read_text(encoding='utf-8')
word_count = count_words(content)
# 创建改写文件名(去掉原文件名后缀,添加-rewrite)
rewrite_name = f"{file_path.stem}-rewrite.txt"
tasks.append({
"id": f"task_{idx:03d}",
"file_name": file_path.name,
"source_path": str(file_path.relative_to(self.work_dir)),
"output_path": str((rewrite_folder / rewrite_name).relative_to(self.work_dir)),
"status": "pending",
"created_at": datetime.now().isoformat(),
"started_at": None,
"completed_at": None,
"word_count_original": word_count,
"word_count_rewritten": None,
"error_message": None
})
data = {"tasks": tasks, "metadata": {
"prompt_file": prompt_file}, "project_name": project_name}
self.update_metadata(data)
self.save_tasks(project_name, data)
return {
"success": True,
"message": f"✅ 初始化成功!\n项目: {project_name}\n扫描到 {len(tasks)} 个文件\n待处理: {data['metadata']['pending']}",
"count": len(tasks),
"metadata": data["metadata"],
"project_name": project_name
}
def get_next_task(self, project_name: str, prompt_file: Path = None) -> Optional[Dict[str, Any]]:
"""
获取下一个待处理的任务
Args:
project_name: 项目名称
prompt_file: 提示词文件路径(可选,优先使用项目配置中的提示词)
Returns:
任务信息或 None
"""
data = self.load_tasks(project_name)
task = next((t for t in data["tasks"]
if t["status"] == "pending"), None)
if not task:
return None
# 更新任务状态
task["status"] = "running"
task["started_at"] = datetime.now().isoformat()
task['error_message'] = None
# 读取源文件内容
source_file = self.work_dir / task["source_path"]
content = ""
if source_file.exists():
content = source_file.read_text(encoding='utf-8')
self.update_metadata(data)
self.save_tasks(project_name, data)
# 读取提示词:优先使用项目 metadata 中配置的提示词文件
prompt = ""
project_prompt_file = data.get("metadata", {}).get("prompt_file")
if project_prompt_file:
# 使用项目配置的提示词文件
prompt_path = workspace_manager.get_prompts_dir() / project_prompt_file
if prompt_path.exists():
prompt = prompt_path.read_text(encoding='utf-8')
elif prompt_file and prompt_file.exists():
# 使用传入的默认提示词
prompt = prompt_file.read_text(encoding='utf-8')
min_words = int(task["word_count_original"] *
MIN_WORD_RATIO) if task["word_count_original"] else 0
return {
"task": task,
"content": content,
"prompt": prompt,
"min_words": min_words,
}
def complete_task(self, task_id: str, project_name: str) -> Dict[str, Any]:
"""
标记任务为已完成
Args:
task_id: 任务 ID
project_name: 项目名称
Returns:
完成结果
"""
data = self.load_tasks(project_name)
task = next((t for t in data["tasks"] if t["id"] == task_id), None)
if not task:
return {
"success": False,
"message": f"❌ 未找到任务 {task_id}"
}
# 检查输出文件是否存在
output_file = self.work_dir / task["output_path"]
if not output_file.exists():
return {
"success": False,
"message": f"❌ 错误: 目标文件不存在,请先使用 write 工具写入文件\n文件路径: {task['output_path']}"
}
# 读取并统计字数
content = output_file.read_text(encoding='utf-8')
word_count = count_words(content)
# 更新任务状态
task["status"] = "completed"
task["completed_at"] = datetime.now().isoformat()
task["word_count_rewritten"] = word_count
# 删除 error_message
task["error_message"] = None
self.update_metadata(data)
self.save_tasks(project_name, data)
original = task["word_count_original"] or 1
rate = (original - word_count) / original * 100
return {
"success": True,
"message": f"""✅ 任务完成: {task_id}
原始: {original} 字
改写: {word_count} 字
压缩率: {rate:.1f}%
已保存: {task['output_path']}
进度: {data['metadata']['completed']}/{data['metadata']['total']} ({data['metadata']['completed']/data['metadata']['total']*100:.0f}%)""",
"task": task,
"metadata": data["metadata"]
}
def fail_task(self, task_id: str, project_name: str, error_message: str) -> Dict[str, Any]:
"""
标记任务为失败
Args:
task_id: 任务 ID
project_name: 项目名称
error_message: 错误信息
Returns:
失败结果
"""
data = self.load_tasks(project_name)
task = next((t for t in data["tasks"] if t["id"] == task_id), None)
if not task:
return {
"success": False,
"message": f"❌ 未找到任务 {task_id}"
}
task["status"] = "failed"
task["error_message"] = error_message
task["completed_at"] = datetime.now().isoformat()
self.update_metadata(data)
self.save_tasks(project_name, data)
return {
"success": True,
"message": f"❌ 任务失败: {task_id}\n错误: {error_message}",
"task": task
}
def reset_task(self, task_id: str, project_name: str) -> Dict[str, Any]:
"""
重置任务状态
Args:
task_id: 任务 ID
project_name: 项目名称
Returns:
重置结果
"""
data = self.load_tasks(project_name)
task = next((t for t in data["tasks"] if t["id"] == task_id), None)
if not task:
return {
"success": False,
"message": f"❌ 未找到任务 {task_id}"
}
task["status"] = "pending"
task["started_at"] = None
task["completed_at"] = None
task["error_message"] = None
self.update_metadata(data)
self.save_tasks(project_name, data)
return {
"success": True,
"message": f"🔄 已重置: {task_id}",
"task": task
}
def get_status(self, project_name: str) -> Dict[str, Any]:
"""获取任务状态统计"""
data = self.load_tasks(project_name)
m = data["metadata"]
progress = int(m["completed"] / max(m["total"], 1) * 20)
status_text = f"""📊 任务统计:
项目: {project_name}
总数: {m['total']}
✅ 已完成: {m['completed']} ({m['completed']/max(m['total'], 1)*100:.0f}%)
⏳ 运行中: {m['running']}
📋 待处理: {m['pending']}
❌ 失败: {m['failed']}
🔕 已忽略: {m.get('ignored', 0)}
[{'█'*progress}{'░'*(20-progress)}] {m['completed']}/{m['total']}"""
return {
"metadata": m,
"status_text": status_text
}
def list_tasks(self, project_name: str, status: str = "all") -> Dict[str, Any]:
"""
列出任务
Args:
project_name: 项目名称
status: 过滤状态
Returns:
任务列表
"""
data = self.load_tasks(project_name)
if status == "all":
tasks = data["tasks"]
else:
tasks = [t for t in data["tasks"] if t["status"] == status]
return {
"tasks": tasks,
"count": len(tasks),
"filtered_by": status,
"project_name": project_name
}
def list_projects(self) -> List[str]:
"""列出所有项目"""
output_dir = workspace_manager.get_output_dir()
logger.debug(f"输出目录: {output_dir}")
if not output_dir.exists():
return []
projects = []
for item in output_dir.iterdir():
if item.is_dir() and (item / "tasks.json").exists():
projects.append(item.name)
return sorted(projects)
def check_all_projects_timeout(self, timeout_minutes: int) -> Dict[str, Any]:
"""
检查所有项目的超时任务
Args:
timeout_minutes: 超时分钟数
Returns:
检查结果
"""
logger.debug(f"开始检查所有项目的超时任务(超时阈值: {timeout_minutes} 分钟)")
projects = self.list_projects()
logger.debug(f"发现 {len(projects)} 个项目: {projects}")
total_checked = 0
total_completed = 0
total_recovered = 0
all_results = []
for project_name in projects:
logger.debug(f"检查项目: {project_name}")
try:
data = self.load_tasks(project_name)
now = datetime.now()
timeout_threshold = timedelta(minutes=timeout_minutes)
for task in data["tasks"]:
# 只检查运行中的任务
if task["status"] != "running":
continue
# 检查是否有开始时间
if not task["started_at"]:
continue
# 计算运行时长
started_at = datetime.fromisoformat(task["started_at"])
elapsed = now - started_at
# 如果未超时,跳过
if elapsed < timeout_threshold:
continue
total_checked += 1
logger.debug(
f"发现超时任务: {task['id']} (项目: {project_name}, 已运行: {elapsed})")
# 检查输出文件是否存在
output_file = self.work_dir / task["output_path"]
if output_file.exists():
# 文件存在,标记为已完成
content = output_file.read_text(encoding='utf-8')
word_count = count_words(content)
# 使用文件的修改时间作为完成时间,而不是检查时的当前时间
import os
file_mod_time = datetime.fromtimestamp(
os.path.getmtime(output_file))
task["status"] = "completed"
task["completed_at"] = file_mod_time.isoformat()
task["word_count_rewritten"] = word_count
task["error_message"] = None # 清除错误信息
total_completed += 1
result_msg = f"✅ [{project_name}] {task['id']}: 发现输出文件,标记为已完成 ({word_count}字)"
all_results.append(result_msg)
logger.info(result_msg)
else:
# 文件不存在,重置为待处理
task["status"] = "pending"
task["started_at"] = None
task["error_message"] = f"超时未完成 (运行时长: {elapsed})"
total_recovered += 1
result_msg = f"🔄 [{project_name}] {task['id']}: 超时未完成,重置为待处理"
all_results.append(result_msg)
logger.info(result_msg)
# 如果有变更,保存任务数据
if total_checked > 0:
logger.debug(f"保存项目 {project_name} 的任务更新")
self.update_metadata(data)
self.save_tasks(project_name, data)
except Exception as e:
error_msg = f"❌ [{project_name}] 检查失败: {e}"
all_results.append(error_msg)
logger.error(error_msg, exc_info=True)
summary = f"""🔍 全局超时任务检查完成
⏱️ 超时阈值: {timeout_minutes} 分钟
📁 检查项目数: {len(projects)}
🔎 检查到超时任务: {total_checked} 个
✅ 自动标记为完成: {total_completed} 个
🔄 重置为待处理: {total_recovered} 个
"""
if all_results:
summary += "\n详细信息:\n" + "\n".join(all_results)
else:
summary += "\n✨ 没有发现超时任务"
logger.debug(
f"超时检查完成: 项目数={len(projects)}, 检查={total_checked}, 完成={total_completed}, 恢复={total_recovered}")
return {
"checked_count": total_checked,
"completed_count": total_completed,
"recovered_count": total_recovered,
"projects_count": len(projects),
"summary": summary,
"results": all_results
}
def update_project_prompt(self, project_name: str, prompt_file: str) -> Dict[str, Any]:
"""
更新项目的提示词配置
Args:
project_name: 项目名称
prompt_file: 提示词文件名
Returns:
更新结果
"""
data = self.load_tasks(project_name)
if not data.get("tasks"):
return {
"success": False,
"error": f"项目 {project_name} 不存在或未初始化"
}
# 更新 metadata 中的提示词配置
if "metadata" not in data:
data["metadata"] = {}
data["metadata"]["prompt_file"] = prompt_file
self.save_tasks(project_name, data)
return {
"success": True,
"message": f"已将项目 {project_name} 的提示词设置为: {prompt_file}",
"prompt_file": prompt_file
}