translation_mcp.py•16.7 kB
#!/usr/bin/env python3
"""
文本翻译 MCP 服务
一个基于 Model Context Protocol (MCP) 的文本翻译服务,
支持 OpenAI 和兼容 OpenAI API 的模型。
使用方法:
uvx translation_mcp.py
或者:
python translation_mcp.py
"""
import asyncio
import json
import os
import sys
from typing import Any, Dict, List, Optional, Sequence
from langchain.schema import HumanMessage, SystemMessage
from langchain_openai import ChatOpenAI
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import EmbeddedResource, ImageContent, Resource, TextContent, Tool
class OpenAITranslator:
"""OpenAI 兼容翻译器"""
def __init__(self):
self.default_api_key = os.getenv("OPENAI_API_KEY")
self.default_model = os.getenv("OPENAI_MODEL", "gpt-3.5-turbo")
self.default_base_url = os.getenv("OPENAI_BASE_URL")
if not self.default_api_key:
raise ValueError("OPENAI_API_KEY 环境变量未设置")
def _create_llm(self, model: str, base_url: Optional[str] = None) -> ChatOpenAI:
"""创建 LLM 实例"""
kwargs = {
"model": model,
"api_key": self.default_api_key,
"temperature": 0.1,
"max_tokens": 2000
}
if base_url:
kwargs["base_url"] = base_url
return ChatOpenAI(**kwargs)
async def translate(
self,
text: str,
target_language: str,
source_language: Optional[str] = None,
model: Optional[str] = None,
base_url: Optional[str] = None
) -> Dict[str, Any]:
"""使用 OpenAI 兼容模型翻译文本"""
try:
# 使用传入的模型或默认模型
actual_model = model or self.default_model
# 使用传入的base_url或默认base_url
actual_base_url = base_url or self.default_base_url
llm = self._create_llm(actual_model, actual_base_url)
# 构建提示词
if source_language:
prompt = f"请将以下{source_language}文本翻译成{target_language},只返回翻译结果:\n\n{text}"
else:
prompt = f"请将以下文本翻译成{target_language},只返回翻译结果:\n\n{text}"
messages = [
SystemMessage(content="你是一个专业的翻译助手,请提供准确、自然的翻译。"),
HumanMessage(content=prompt)
]
response = await llm.ainvoke(messages)
translated_text = response.content.strip()
return {
"translated_text": translated_text,
"source_language": source_language or "auto",
"target_language": target_language,
"model": model,
"base_url": base_url,
"success": True
}
except Exception as e:
return {
"error": str(e),
"success": False
}
async def detect_language(
self,
text: str,
model: Optional[str] = None,
base_url: Optional[str] = None
) -> str:
"""检测文本语言"""
try:
# 使用传入的模型或默认模型
actual_model = model or self.default_model
# 使用传入的base_url或默认base_url
actual_base_url = base_url or self.default_base_url
llm = self._create_llm(actual_model, actual_base_url)
prompt = f"请检测以下文本的语言,只返回语言代码(如:en, zh, ja, ko):\n\n{text}"
messages = [
SystemMessage(content="你是一个语言检测助手,请返回准确的语言代码。"),
HumanMessage(content=prompt)
]
response = await llm.ainvoke(messages)
return response.content.strip().lower()
except Exception as e:
return "unknown"
def split_text_into_chunks(self, text: str, max_chunk_size: int = 2000) -> List[str]:
"""将文本分割成适合翻译的块"""
lines = text.split('\n')
chunks = []
current_chunk = []
current_size = 0
for line in lines:
line_size = len(line) + 1 # +1 for newline
# 如果添加这一行会超过限制,先保存当前块
if current_size + line_size > max_chunk_size and current_chunk:
chunks.append('\n'.join(current_chunk))
current_chunk = [line]
current_size = line_size
else:
current_chunk.append(line)
current_size += line_size
# 添加最后一个块
if current_chunk:
chunks.append('\n'.join(current_chunk))
return chunks
async def translate_long_text(
self,
text: str,
target_language: str,
source_language: Optional[str] = None,
max_chunk_size: int = 2000,
model: Optional[str] = None,
base_url: Optional[str] = None
) -> Dict[str, Any]:
"""翻译长文本(自动分块处理)"""
try:
# 如果文本较短,直接翻译
if len(text) <= max_chunk_size:
return await self.translate(text, target_language, source_language, model, base_url)
# 分割文本
chunks = self.split_text_into_chunks(text, max_chunk_size)
# 翻译每个块
translated_chunks = []
for i, chunk in enumerate(chunks):
try:
result = await self.translate(chunk, target_language, source_language, model, base_url)
if result.get("success"):
translated_chunks.append(result.get("translated_text", ""))
else:
return {
"error": f"翻译第 {i+1} 块失败: {result.get('error', '未知错误')}",
"success": False
}
# 避免请求过于频繁
if i < len(chunks) - 1:
await asyncio.sleep(0.5)
except Exception as e:
return {
"error": f"翻译第 {i+1} 块时发生错误: {str(e)}",
"success": False
}
# 合并翻译结果
translated_text = '\n'.join(translated_chunks)
return {
"translated_text": translated_text,
"source_language": source_language or "auto-detected",
"target_language": target_language,
"chunks_count": len(chunks),
"success": True
}
except Exception as e:
return {
"error": f"分块翻译失败: {str(e)}",
"success": False
}
def get_supported_languages(self) -> List[str]:
"""获取支持的语言列表"""
return [
"en", "zh", "ja", "ko", "es", "fr", "de", "it", "pt", "ru",
"ar", "hi", "th", "vi", "id", "ms", "tl", "tr", "pl", "nl"
]
class TranslationMCPServer:
"""翻译 MCP 服务器"""
def __init__(self):
self.server = Server("translation-mcp")
self.translator = OpenAITranslator()
self._setup_handlers()
def _setup_handlers(self):
"""设置 MCP 处理器"""
@self.server.list_tools()
async def handle_list_tools() -> List[Tool]:
"""列出可用工具"""
return [
Tool(
name="translate_text",
description="翻译文本到指定语言",
inputSchema={
"type": "object",
"properties": {
"text": {
"type": "string",
"description": "要翻译的文本"
},
"target_language": {
"type": "string",
"description": "目标语言代码 (如: en, zh, ja, ko)"
},
"source_language": {
"type": "string",
"description": "源语言代码 (可选,自动检测)"
}
},
"required": ["text", "target_language"]
}
),
Tool(
name="detect_language",
description="检测文本语言",
inputSchema={
"type": "object",
"properties": {
"text": {
"type": "string",
"description": "要检测语言的文本"
}
},
"required": ["text"]
}
),
Tool(
name="translate_long_text",
description="翻译长文本(自动分块处理,适合处理大文档)",
inputSchema={
"type": "object",
"properties": {
"text": {
"type": "string",
"description": "要翻译的长文本"
},
"target_language": {
"type": "string",
"description": "目标语言代码 (如: en, zh, ja, ko)"
},
"source_language": {
"type": "string",
"description": "源语言代码 (可选,自动检测)"
},
"max_chunk_size": {
"type": "integer",
"description": "每个文本块的最大字符数 (默认: 2000)",
"default": 2000
}
},
"required": ["text", "target_language"]
}
)
]
@self.server.call_tool()
async def handle_call_tool(name: str, arguments: dict) -> List[TextContent]:
"""处理工具调用"""
try:
if name == "translate_text":
result = await self.translator.translate(
text=arguments["text"],
target_language=arguments["target_language"],
source_language=arguments.get("source_language")
)
return [TextContent(
type="text",
text=json.dumps(result, ensure_ascii=False, indent=2)
)]
elif name == "detect_language":
language = await self.translator.detect_language(
text=arguments["text"]
)
result = {
"detected_language": language,
"success": True
}
return [TextContent(
type="text",
text=json.dumps(result, ensure_ascii=False, indent=2)
)]
elif name == "translate_long_text":
result = await self.translator.translate_long_text(
text=arguments["text"],
target_language=arguments["target_language"],
source_language=arguments.get("source_language"),
max_chunk_size=arguments.get("max_chunk_size", 2000)
)
return [TextContent(
type="text",
text=json.dumps(result, ensure_ascii=False, indent=2)
)]
else:
raise ValueError(f"Unknown tool: {name}")
except Exception as e:
error_result = {
"error": str(e),
"success": False
}
return [TextContent(
type="text",
text=json.dumps(error_result, ensure_ascii=False, indent=2)
)]
@self.server.list_resources()
async def handle_list_resources() -> List[Resource]:
"""列出可用资源"""
return [
Resource(
uri="translation://languages",
name="支持的语言列表",
description="获取所有支持的语言代码和名称",
mimeType="application/json"
)
]
@self.server.read_resource()
async def handle_read_resource(uri: str) -> str:
"""读取资源"""
if uri == "translation://languages":
languages = {
"supported_languages": [
{"code": "en", "name": "English"},
{"code": "zh", "name": "中文"},
{"code": "ja", "name": "日本語"},
{"code": "ko", "name": "한국어"},
{"code": "es", "name": "Español"},
{"code": "fr", "name": "Français"},
{"code": "de", "name": "Deutsch"},
{"code": "it", "name": "Italiano"},
{"code": "pt", "name": "Português"},
{"code": "ru", "name": "Русский"},
{"code": "ar", "name": "العربية"},
{"code": "hi", "name": "हिन्दी"},
{"code": "th", "name": "ไทย"},
{"code": "vi", "name": "Tiếng Việt"},
{"code": "id", "name": "Bahasa Indonesia"},
{"code": "ms", "name": "Bahasa Melayu"},
{"code": "tl", "name": "Filipino"},
{"code": "tr", "name": "Türkçe"},
{"code": "pl", "name": "Polski"},
{"code": "nl", "name": "Nederlands"}
]
}
return json.dumps(languages, ensure_ascii=False, indent=2)
else:
raise ValueError(f"Unknown resource: {uri}")
async def run(self):
"""运行 MCP 服务器"""
async with stdio_server() as (read_stream, write_stream):
await self.server.run(
read_stream,
write_stream,
self.server.create_initialization_options()
)
def main():
"""主函数"""
# 检查环境变量
if not os.getenv("OPENAI_API_KEY"):
print("警告: 未设置 OPENAI_API_KEY 环境变量")
print("请设置环境变量: export OPENAI_API_KEY=your_api_key")
print("或者在调用时通过 base_url 参数使用本地模型")
print()
# 打印使用说明
if len(sys.argv) > 1 and sys.argv[1] in ["-h", "--help"]:
print(__doc__)
print("\n环境变量:")
print(" OPENAI_API_KEY: OpenAI API 密钥 (必需,除非使用本地模型)")
print("\n示例:")
print(" # 使用 OpenAI 官方 API")
print(" export OPENAI_API_KEY=your_api_key")
print(" uvx translation_mcp.py")
return
# 启动服务器
try:
server = TranslationMCPServer()
print("启动翻译 MCP 服务器...", file=sys.stderr)
asyncio.run(server.run())
except KeyboardInterrupt:
print("\n服务器已停止", file=sys.stderr)
except Exception as e:
print(f"错误: {e}", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main()