# coding=utf-8
"""
AI 翻译器模块
对推送内容进行多语言翻译
基于 LiteLLM 统一接口,支持 100+ AI 提供商
"""
import json
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Dict, List, Optional
from trendradar.ai.client import AIClient
@dataclass
class TranslationResult:
"""翻译结果"""
translated_text: str = "" # 翻译后的文本
original_text: str = "" # 原始文本
success: bool = False # 是否成功
error: str = "" # 错误信息
@dataclass
class BatchTranslationResult:
"""批量翻译结果"""
results: List[TranslationResult] = field(default_factory=list)
success_count: int = 0
fail_count: int = 0
total_count: int = 0
class AITranslator:
"""AI 翻译器"""
def __init__(self, translation_config: Dict[str, Any], ai_config: Dict[str, Any]):
"""
初始化 AI 翻译器
Args:
translation_config: AI 翻译配置 (AI_TRANSLATION)
ai_config: AI 模型配置(LiteLLM 格式)
"""
self.translation_config = translation_config
self.ai_config = ai_config
# 翻译配置
self.enabled = translation_config.get("ENABLED", False)
self.target_language = translation_config.get("LANGUAGE", "English")
# 创建 AI 客户端(基于 LiteLLM)
self.client = AIClient(ai_config)
# 加载提示词模板
self.system_prompt, self.user_prompt_template = self._load_prompt_template(
translation_config.get("PROMPT_FILE", "ai_translation_prompt.txt")
)
def _load_prompt_template(self, prompt_file: str) -> tuple:
"""加载提示词模板"""
config_dir = Path(__file__).parent.parent.parent / "config"
prompt_path = config_dir / prompt_file
if not prompt_path.exists():
print(f"[翻译] 提示词文件不存在: {prompt_path}")
return "", ""
content = prompt_path.read_text(encoding="utf-8")
# 解析 [system] 和 [user] 部分
system_prompt = ""
user_prompt = ""
if "[system]" in content and "[user]" in content:
parts = content.split("[user]")
system_part = parts[0]
user_part = parts[1] if len(parts) > 1 else ""
if "[system]" in system_part:
system_prompt = system_part.split("[system]")[1].strip()
user_prompt = user_part.strip()
else:
user_prompt = content
return system_prompt, user_prompt
def translate(self, text: str) -> TranslationResult:
"""
翻译单条文本
Args:
text: 要翻译的文本
Returns:
TranslationResult: 翻译结果
"""
result = TranslationResult(original_text=text)
if not self.enabled:
result.error = "翻译功能未启用"
return result
if not self.client.api_key:
result.error = "未配置 AI API Key"
return result
if not text or not text.strip():
result.translated_text = text
result.success = True
return result
try:
# 构建提示词
user_prompt = self.user_prompt_template
user_prompt = user_prompt.replace("{target_language}", self.target_language)
user_prompt = user_prompt.replace("{content}", text)
# 调用 AI API
response = self._call_ai(user_prompt)
result.translated_text = response.strip()
result.success = True
except Exception as e:
error_type = type(e).__name__
error_msg = str(e)
if len(error_msg) > 100:
error_msg = error_msg[:100] + "..."
result.error = f"翻译失败 ({error_type}): {error_msg}"
return result
def translate_batch(self, texts: List[str]) -> BatchTranslationResult:
"""
批量翻译文本(单次 API 调用)
Args:
texts: 要翻译的文本列表
Returns:
BatchTranslationResult: 批量翻译结果
"""
batch_result = BatchTranslationResult(total_count=len(texts))
if not self.enabled:
for text in texts:
batch_result.results.append(TranslationResult(
original_text=text,
error="翻译功能未启用"
))
batch_result.fail_count = len(texts)
return batch_result
if not self.client.api_key:
for text in texts:
batch_result.results.append(TranslationResult(
original_text=text,
error="未配置 AI API Key"
))
batch_result.fail_count = len(texts)
return batch_result
if not texts:
return batch_result
# 过滤空文本
non_empty_indices = []
non_empty_texts = []
for i, text in enumerate(texts):
if text and text.strip():
non_empty_indices.append(i)
non_empty_texts.append(text)
# 初始化结果列表
for text in texts:
batch_result.results.append(TranslationResult(original_text=text))
# 空文本直接标记成功
for i, text in enumerate(texts):
if not text or not text.strip():
batch_result.results[i].translated_text = text
batch_result.results[i].success = True
batch_result.success_count += 1
if not non_empty_texts:
return batch_result
try:
# 构建批量翻译内容(使用编号格式)
batch_content = self._format_batch_content(non_empty_texts)
# 构建提示词
user_prompt = self.user_prompt_template
user_prompt = user_prompt.replace("{target_language}", self.target_language)
user_prompt = user_prompt.replace("{content}", batch_content)
# 调用 AI API
response = self._call_ai(user_prompt)
# 解析批量翻译结果
translated_texts = self._parse_batch_response(response, len(non_empty_texts))
# 填充结果
for idx, translated in zip(non_empty_indices, translated_texts):
batch_result.results[idx].translated_text = translated
batch_result.results[idx].success = True
batch_result.success_count += 1
except Exception as e:
error_msg = f"批量翻译失败: {type(e).__name__}: {str(e)[:100]}"
for idx in non_empty_indices:
batch_result.results[idx].error = error_msg
batch_result.fail_count = len(non_empty_indices)
return batch_result
def _format_batch_content(self, texts: List[str]) -> str:
"""格式化批量翻译内容"""
lines = []
for i, text in enumerate(texts, 1):
lines.append(f"[{i}] {text}")
return "\n".join(lines)
def _parse_batch_response(self, response: str, expected_count: int) -> List[str]:
"""
解析批量翻译响应
Args:
response: AI 响应文本
expected_count: 期望的翻译数量
Returns:
List[str]: 翻译结果列表
"""
results = []
lines = response.strip().split("\n")
current_idx = None
current_text = []
for line in lines:
# 尝试匹配 [数字] 格式
stripped = line.strip()
if stripped.startswith("[") and "]" in stripped:
bracket_end = stripped.index("]")
try:
idx = int(stripped[1:bracket_end])
# 保存之前的内容
if current_idx is not None:
results.append((current_idx, "\n".join(current_text).strip()))
current_idx = idx
current_text = [stripped[bracket_end + 1:].strip()]
except ValueError:
if current_idx is not None:
current_text.append(line)
else:
if current_idx is not None:
current_text.append(line)
# 保存最后一条
if current_idx is not None:
results.append((current_idx, "\n".join(current_text).strip()))
# 按索引排序并提取文本
results.sort(key=lambda x: x[0])
translated = [text for _, text in results]
# 如果解析结果数量不匹配,尝试简单按行分割
if len(translated) != expected_count:
# 回退:按行分割(去除编号)
translated = []
for line in lines:
stripped = line.strip()
if stripped.startswith("[") and "]" in stripped:
bracket_end = stripped.index("]")
translated.append(stripped[bracket_end + 1:].strip())
elif stripped:
translated.append(stripped)
# 确保返回正确数量
while len(translated) < expected_count:
translated.append("")
return translated[:expected_count]
def _call_ai(self, user_prompt: str) -> str:
"""调用 AI API(使用 LiteLLM)"""
messages = []
if self.system_prompt:
messages.append({"role": "system", "content": self.system_prompt})
messages.append({"role": "user", "content": user_prompt})
return self.client.chat(messages)