Skip to main content
Glama

Translation MCP Server

by Barnettxxf
translation_mcp.py16.7 kB
#!/usr/bin/env python3 """ 文本翻译 MCP 服务 一个基于 Model Context Protocol (MCP) 的文本翻译服务, 支持 OpenAI 和兼容 OpenAI API 的模型。 使用方法: uvx translation_mcp.py 或者: python translation_mcp.py """ import asyncio import json import os import sys from typing import Any, Dict, List, Optional, Sequence from langchain.schema import HumanMessage, SystemMessage from langchain_openai import ChatOpenAI from mcp.server import Server from mcp.server.stdio import stdio_server from mcp.types import EmbeddedResource, ImageContent, Resource, TextContent, Tool class OpenAITranslator: """OpenAI 兼容翻译器""" def __init__(self): self.default_api_key = os.getenv("OPENAI_API_KEY") self.default_model = os.getenv("OPENAI_MODEL", "gpt-3.5-turbo") self.default_base_url = os.getenv("OPENAI_BASE_URL") if not self.default_api_key: raise ValueError("OPENAI_API_KEY 环境变量未设置") def _create_llm(self, model: str, base_url: Optional[str] = None) -> ChatOpenAI: """创建 LLM 实例""" kwargs = { "model": model, "api_key": self.default_api_key, "temperature": 0.1, "max_tokens": 2000 } if base_url: kwargs["base_url"] = base_url return ChatOpenAI(**kwargs) async def translate( self, text: str, target_language: str, source_language: Optional[str] = None, model: Optional[str] = None, base_url: Optional[str] = None ) -> Dict[str, Any]: """使用 OpenAI 兼容模型翻译文本""" try: # 使用传入的模型或默认模型 actual_model = model or self.default_model # 使用传入的base_url或默认base_url actual_base_url = base_url or self.default_base_url llm = self._create_llm(actual_model, actual_base_url) # 构建提示词 if source_language: prompt = f"请将以下{source_language}文本翻译成{target_language},只返回翻译结果:\n\n{text}" else: prompt = f"请将以下文本翻译成{target_language},只返回翻译结果:\n\n{text}" messages = [ SystemMessage(content="你是一个专业的翻译助手,请提供准确、自然的翻译。"), HumanMessage(content=prompt) ] response = await llm.ainvoke(messages) translated_text = response.content.strip() return { "translated_text": translated_text, "source_language": source_language or "auto", "target_language": target_language, "model": model, "base_url": base_url, "success": True } except Exception as e: return { "error": str(e), "success": False } async def detect_language( self, text: str, model: Optional[str] = None, base_url: Optional[str] = None ) -> str: """检测文本语言""" try: # 使用传入的模型或默认模型 actual_model = model or self.default_model # 使用传入的base_url或默认base_url actual_base_url = base_url or self.default_base_url llm = self._create_llm(actual_model, actual_base_url) prompt = f"请检测以下文本的语言,只返回语言代码(如:en, zh, ja, ko):\n\n{text}" messages = [ SystemMessage(content="你是一个语言检测助手,请返回准确的语言代码。"), HumanMessage(content=prompt) ] response = await llm.ainvoke(messages) return response.content.strip().lower() except Exception as e: return "unknown" def split_text_into_chunks(self, text: str, max_chunk_size: int = 2000) -> List[str]: """将文本分割成适合翻译的块""" lines = text.split('\n') chunks = [] current_chunk = [] current_size = 0 for line in lines: line_size = len(line) + 1 # +1 for newline # 如果添加这一行会超过限制,先保存当前块 if current_size + line_size > max_chunk_size and current_chunk: chunks.append('\n'.join(current_chunk)) current_chunk = [line] current_size = line_size else: current_chunk.append(line) current_size += line_size # 添加最后一个块 if current_chunk: chunks.append('\n'.join(current_chunk)) return chunks async def translate_long_text( self, text: str, target_language: str, source_language: Optional[str] = None, max_chunk_size: int = 2000, model: Optional[str] = None, base_url: Optional[str] = None ) -> Dict[str, Any]: """翻译长文本(自动分块处理)""" try: # 如果文本较短,直接翻译 if len(text) <= max_chunk_size: return await self.translate(text, target_language, source_language, model, base_url) # 分割文本 chunks = self.split_text_into_chunks(text, max_chunk_size) # 翻译每个块 translated_chunks = [] for i, chunk in enumerate(chunks): try: result = await self.translate(chunk, target_language, source_language, model, base_url) if result.get("success"): translated_chunks.append(result.get("translated_text", "")) else: return { "error": f"翻译第 {i+1} 块失败: {result.get('error', '未知错误')}", "success": False } # 避免请求过于频繁 if i < len(chunks) - 1: await asyncio.sleep(0.5) except Exception as e: return { "error": f"翻译第 {i+1} 块时发生错误: {str(e)}", "success": False } # 合并翻译结果 translated_text = '\n'.join(translated_chunks) return { "translated_text": translated_text, "source_language": source_language or "auto-detected", "target_language": target_language, "chunks_count": len(chunks), "success": True } except Exception as e: return { "error": f"分块翻译失败: {str(e)}", "success": False } def get_supported_languages(self) -> List[str]: """获取支持的语言列表""" return [ "en", "zh", "ja", "ko", "es", "fr", "de", "it", "pt", "ru", "ar", "hi", "th", "vi", "id", "ms", "tl", "tr", "pl", "nl" ] class TranslationMCPServer: """翻译 MCP 服务器""" def __init__(self): self.server = Server("translation-mcp") self.translator = OpenAITranslator() self._setup_handlers() def _setup_handlers(self): """设置 MCP 处理器""" @self.server.list_tools() async def handle_list_tools() -> List[Tool]: """列出可用工具""" return [ Tool( name="translate_text", description="翻译文本到指定语言", inputSchema={ "type": "object", "properties": { "text": { "type": "string", "description": "要翻译的文本" }, "target_language": { "type": "string", "description": "目标语言代码 (如: en, zh, ja, ko)" }, "source_language": { "type": "string", "description": "源语言代码 (可选,自动检测)" } }, "required": ["text", "target_language"] } ), Tool( name="detect_language", description="检测文本语言", inputSchema={ "type": "object", "properties": { "text": { "type": "string", "description": "要检测语言的文本" } }, "required": ["text"] } ), Tool( name="translate_long_text", description="翻译长文本(自动分块处理,适合处理大文档)", inputSchema={ "type": "object", "properties": { "text": { "type": "string", "description": "要翻译的长文本" }, "target_language": { "type": "string", "description": "目标语言代码 (如: en, zh, ja, ko)" }, "source_language": { "type": "string", "description": "源语言代码 (可选,自动检测)" }, "max_chunk_size": { "type": "integer", "description": "每个文本块的最大字符数 (默认: 2000)", "default": 2000 } }, "required": ["text", "target_language"] } ) ] @self.server.call_tool() async def handle_call_tool(name: str, arguments: dict) -> List[TextContent]: """处理工具调用""" try: if name == "translate_text": result = await self.translator.translate( text=arguments["text"], target_language=arguments["target_language"], source_language=arguments.get("source_language") ) return [TextContent( type="text", text=json.dumps(result, ensure_ascii=False, indent=2) )] elif name == "detect_language": language = await self.translator.detect_language( text=arguments["text"] ) result = { "detected_language": language, "success": True } return [TextContent( type="text", text=json.dumps(result, ensure_ascii=False, indent=2) )] elif name == "translate_long_text": result = await self.translator.translate_long_text( text=arguments["text"], target_language=arguments["target_language"], source_language=arguments.get("source_language"), max_chunk_size=arguments.get("max_chunk_size", 2000) ) return [TextContent( type="text", text=json.dumps(result, ensure_ascii=False, indent=2) )] else: raise ValueError(f"Unknown tool: {name}") except Exception as e: error_result = { "error": str(e), "success": False } return [TextContent( type="text", text=json.dumps(error_result, ensure_ascii=False, indent=2) )] @self.server.list_resources() async def handle_list_resources() -> List[Resource]: """列出可用资源""" return [ Resource( uri="translation://languages", name="支持的语言列表", description="获取所有支持的语言代码和名称", mimeType="application/json" ) ] @self.server.read_resource() async def handle_read_resource(uri: str) -> str: """读取资源""" if uri == "translation://languages": languages = { "supported_languages": [ {"code": "en", "name": "English"}, {"code": "zh", "name": "中文"}, {"code": "ja", "name": "日本語"}, {"code": "ko", "name": "한국어"}, {"code": "es", "name": "Español"}, {"code": "fr", "name": "Français"}, {"code": "de", "name": "Deutsch"}, {"code": "it", "name": "Italiano"}, {"code": "pt", "name": "Português"}, {"code": "ru", "name": "Русский"}, {"code": "ar", "name": "العربية"}, {"code": "hi", "name": "हिन्दी"}, {"code": "th", "name": "ไทย"}, {"code": "vi", "name": "Tiếng Việt"}, {"code": "id", "name": "Bahasa Indonesia"}, {"code": "ms", "name": "Bahasa Melayu"}, {"code": "tl", "name": "Filipino"}, {"code": "tr", "name": "Türkçe"}, {"code": "pl", "name": "Polski"}, {"code": "nl", "name": "Nederlands"} ] } return json.dumps(languages, ensure_ascii=False, indent=2) else: raise ValueError(f"Unknown resource: {uri}") async def run(self): """运行 MCP 服务器""" async with stdio_server() as (read_stream, write_stream): await self.server.run( read_stream, write_stream, self.server.create_initialization_options() ) def main(): """主函数""" # 检查环境变量 if not os.getenv("OPENAI_API_KEY"): print("警告: 未设置 OPENAI_API_KEY 环境变量") print("请设置环境变量: export OPENAI_API_KEY=your_api_key") print("或者在调用时通过 base_url 参数使用本地模型") print() # 打印使用说明 if len(sys.argv) > 1 and sys.argv[1] in ["-h", "--help"]: print(__doc__) print("\n环境变量:") print(" OPENAI_API_KEY: OpenAI API 密钥 (必需,除非使用本地模型)") print("\n示例:") print(" # 使用 OpenAI 官方 API") print(" export OPENAI_API_KEY=your_api_key") print(" uvx translation_mcp.py") return # 启动服务器 try: server = TranslationMCPServer() print("启动翻译 MCP 服务器...", file=sys.stderr) asyncio.run(server.run()) except KeyboardInterrupt: print("\n服务器已停止", file=sys.stderr) except Exception as e: print(f"错误: {e}", file=sys.stderr) sys.exit(1) if __name__ == "__main__": main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Barnettxxf/translation_mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server