Skip to main content
Glama
sumik5
by sumik5
main.py20.5 kB
#!/usr/bin/env python3 """ SRT翻訳MCPサーバー fastmcp最新版を使用したMCPサーバー実装 使用例: 1. 基本的な翻訳: translated = translate_srt(srt_content=content) 2. カスタム設定: translated = translate_srt( srt_content=content, model_name="llama-3.2-3b", chunk_size=500 ) """ import os import sys import logging import re import json from pathlib import Path from typing import Optional, List, Tuple, Dict from datetime import datetime # プロジェクトルートをPythonパスに追加 project_root = Path(__file__).parent.parent sys.path.insert(0, str(project_root)) from fastmcp import FastMCP # モジュールインポート from modules.srt_parser import SRTParser, Subtitle from modules.translator import Translator # ログ設定 logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) logger = logging.getLogger(__name__) # 環境変数からデフォルト値を取得 DEFAULT_LM_STUDIO_URL = os.getenv('LM_STUDIO_URL', 'http://localhost:1234/v1') DEFAULT_MODEL_NAME = os.getenv('LM_MODEL_NAME', '') DEFAULT_CHUNK_SIZE = int(os.getenv('CHUNK_SIZE', '1000')) # FastMCPサーバーインスタンスを作成 mcp = FastMCP( "translate-srt", description="SRT字幕ファイルを日本語に翻訳するMCPサーバー。LM Studioの翻訳モデルを使用して高品質な字幕翻訳を提供します。" ) # 翻訳統計を保持 translation_stats = { "total_translations": 0, "total_characters": 0, "total_subtitles": 0, "last_translation": None, "errors": 0 } def parse_srt_string(srt_content: str) -> List[Tuple[int, str, str, str]]: """ SRT文字列をパースして字幕エントリのリストを返す Returns: List of tuples: (index, start_time, end_time, text) """ entries = [] blocks = re.split(r'\n\s*\n', srt_content.strip()) for block in blocks: if not block.strip(): continue lines = block.strip().split('\n') if len(lines) < 3: continue try: # インデックス index = int(lines[0].strip()) # タイムコード time_match = re.match(r'(\d{2}:\d{2}:\d{2},\d{3})\s*-->\s*(\d{2}:\d{2}:\d{2},\d{3})', lines[1]) if not time_match: continue start_time, end_time = time_match.groups() # テキスト text = '\n'.join(lines[2:]) entries.append((index, start_time, end_time, text)) except (ValueError, IndexError): continue return entries def split_srt_into_chunks(srt_content: str, chunk_size: int) -> List[str]: """ SRT文字列をチャンクに分割(字幕ブロックを分断しない) Args: srt_content: SRT形式の文字列 chunk_size: 各チャンクの最大文字数 Returns: チャンクに分割されたSRT文字列のリスト """ entries = parse_srt_string(srt_content) if not entries: return [srt_content] chunks = [] current_chunk = [] current_size = 0 for entry in entries: index, start_time, end_time, text = entry # エントリーをSRT形式に変換 entry_text = f"{index}\n{start_time} --> {end_time}\n{text}\n\n" entry_size = len(entry_text) # チャンクサイズを超えそうな場合、現在のチャンクを保存して新しいチャンクを開始 if current_size + entry_size > chunk_size and current_chunk: chunks.append(''.join(current_chunk).strip()) current_chunk = [] current_size = 0 current_chunk.append(entry_text) current_size += entry_size # 最後のチャンクを追加 if current_chunk: chunks.append(''.join(current_chunk).strip()) return chunks if chunks else [srt_content] def merge_translated_chunks(translated_chunks: List[str]) -> str: """ 翻訳されたチャンクを結合して1つのSRT文字列にする """ # 各チャンクの最後に空行を追加して結合 merged = '\n\n'.join(chunk.strip() for chunk in translated_chunks if chunk.strip()) return merged @mcp.tool( description="""SRT字幕テキストを日本語に翻訳してSRT形式のデータを返す。 使用例: 1. ファイルから読み込んで翻訳: srt_content = read_file("movie.srt") translated = translate_srt(srt_content=srt_content) write_file("movie_ja.srt", translated) 2. カスタム設定で翻訳: translated = translate_srt( srt_content=srt_content, model_name="llama-3.2-3b", chunk_size=500 ) 注意事項: - LM Studioが起動している必要があります - model_nameを指定しない場合は環境変数LM_MODEL_NAMEが必要です - 大きなファイルはchunk_sizeで分割処理されます - タイムスタンプは保持されます""" ) async def translate_srt( srt_content: str, lm_studio_url: Optional[str] = None, model_name: Optional[str] = None, chunk_size: Optional[int] = None, preserve_formatting: bool = True ) -> str: """ SRT字幕テキストを日本語に翻訳してSRT形式のデータを返す Args: srt_content: 翻訳対象のSRT形式テキスト lm_studio_url: LM StudioのAPI URL (省略時は環境変数LM_STUDIO_URLを使用) model_name: 使用する翻訳モデル名 (省略時は環境変数LM_MODEL_NAMEを使用) chunk_size: チャンクサイズ (省略時は環境変数CHUNK_SIZEまたはデフォルト1000を使用) preserve_formatting: 元の書式(改行、スペース等)を保持するか Returns: str: 翻訳結果のSRT形式データ Raises: ValueError: 無効なパラメータが指定された、またはモデル名が環境変数に設定されていない ConnectionError: LM StudioのAPIに接続できない RuntimeError: 翻訳処理中にエラーが発生した """ # デフォルト値を使用 if lm_studio_url is None: lm_studio_url = DEFAULT_LM_STUDIO_URL logger.info(f"Using LM Studio URL from environment: {lm_studio_url}") if model_name is None: model_name = DEFAULT_MODEL_NAME if not model_name: raise ValueError("Model name is required. Set LM_MODEL_NAME environment variable or provide model_name parameter.") logger.info(f"Using model from environment: {model_name}") if chunk_size is None: chunk_size = DEFAULT_CHUNK_SIZE logger.info(f"Using chunk size: {chunk_size}") # 統計情報を更新 global translation_stats translation_stats["total_translations"] += 1 translation_stats["last_translation"] = datetime.now().isoformat() logger.info(f"Translation started (Translation #{translation_stats['total_translations']})") logger.info(f"LM Studio URL: {lm_studio_url}") logger.info(f"Model: {model_name}") logger.info(f"Input length: {len(srt_content)} characters") logger.info(f"Preserve formatting: {preserve_formatting}") try: # SRT文字列をチャンクに分割 chunks = split_srt_into_chunks(srt_content, chunk_size) logger.info(f"Split into {len(chunks)} chunks") # 各チャンクを翻訳 translator = Translator( lm_studio_url=lm_studio_url, model_name=model_name ) translated_chunks = [] for i, chunk in enumerate(chunks, 1): logger.info(f"Translating chunk {i}/{len(chunks)} ({len(chunk)} characters)...") # SRTパーサーを使用してSubtitleオブジェクトに変換 srt_parser = SRTParser() # 一時的にチャンクをファイルに保存せずに処理 # parse_srtメソッドをオーバーライドする代わりに、直接Subtitleオブジェクトを作成 entries = parse_srt_string(chunk) subtitles = [] for index, start_time, end_time, text in entries: subtitles.append(Subtitle( index=index, start_time=start_time, end_time=end_time, text=text )) # 翻訳実行 if subtitles: translated_subtitles = await translator.translate_subtitles(subtitles) # 翻訳結果をSRT形式に変換 translated_chunk = srt_parser.generate_srt_string(translated_subtitles) translated_chunks.append(translated_chunk) else: # 空のチャンクの場合はそのまま追加 translated_chunks.append(chunk) # 翻訳されたチャンクを結合 result = merge_translated_chunks(translated_chunks) # 統計情報を更新 translation_stats["total_characters"] += len(srt_content) translation_stats["total_subtitles"] += len(parse_srt_string(srt_content)) logger.info(f"Translation completed successfully") logger.info(f"Output length: {len(result)} characters") return result except Exception as e: translation_stats["errors"] += 1 logger.error(f"Translation failed: {e}") raise RuntimeError(f"Translation process failed: {str(e)}") from e @mcp.tool( description="サーバー情報と統計を取得" ) async def get_server_info() -> dict: """ サーバー情報と統計を取得 Returns: dict: サーバーの名前、バージョン、統計情報 """ return { "name": "translate-srt", "version": "0.2.0", "description": "SRT subtitle translation MCP server with LM Studio integration", "fastmcp_version": "2.11.3", "configuration": { "default_lm_studio_url": DEFAULT_LM_STUDIO_URL, "default_model_name": DEFAULT_MODEL_NAME or "(not set - must provide)", "default_chunk_size": DEFAULT_CHUNK_SIZE }, "statistics": translation_stats, "capabilities": [ "SRT format parsing and generation", "Chunk-based translation for large files", "Preserve timing information", "Support for multi-line subtitles", "Automatic retry on failure", "LM Studio integration" ], "supported_languages": { "source": "Any (auto-detect)", "target": "Japanese" } } @mcp.tool( description="""SRTファイルの検証と分析を行う。 字幕数、総時間、平均文字数などの統計情報を提供します。""" ) async def analyze_srt( srt_content: str, detailed: bool = False ) -> dict: """ SRTファイルの内容を分析 Args: srt_content: 分析対象のSRT形式テキスト detailed: 詳細分析を行うか Returns: dict: 分析結果 """ try: entries = parse_srt_string(srt_content) if not entries: return { "valid": False, "error": "No valid SRT entries found", "subtitle_count": 0 } # 基本統計 subtitle_count = len(entries) total_chars = sum(len(entry[3]) for entry in entries) avg_chars = total_chars / subtitle_count if subtitle_count > 0 else 0 # 時間計算 def parse_time(time_str: str) -> float: """時間文字列を秒に変換""" parts = time_str.replace(',', '.').split(':') return float(parts[0]) * 3600 + float(parts[1]) * 60 + float(parts[2]) first_start = parse_time(entries[0][1]) last_end = parse_time(entries[-1][2]) total_duration = last_end - first_start result = { "valid": True, "subtitle_count": subtitle_count, "total_characters": total_chars, "average_characters": round(avg_chars, 1), "total_duration_seconds": round(total_duration, 2), "duration_formatted": f"{int(total_duration//60)}:{int(total_duration%60):02d}", "first_timestamp": entries[0][1], "last_timestamp": entries[-1][2] } if detailed: # 詳細分析 line_counts = [len(entry[3].split('\n')) for entry in entries] char_counts = [len(entry[3]) for entry in entries] result["detailed_stats"] = { "max_lines_per_subtitle": max(line_counts), "avg_lines_per_subtitle": round(sum(line_counts) / len(line_counts), 1), "max_characters": max(char_counts), "min_characters": min(char_counts), "empty_subtitles": sum(1 for c in char_counts if c == 0), "multi_line_subtitles": sum(1 for l in line_counts if l > 1) } # 言語検出(簡易版) text_sample = ' '.join(entry[3] for entry in entries[:50]) # 最初の50個をサンプル has_japanese = bool(re.search(r'[\u3040-\u309F\u30A0-\u30FF\u4E00-\u9FAF]', text_sample)) has_english = bool(re.search(r'[a-zA-Z]{3,}', text_sample)) result["detected_languages"] = { "japanese": has_japanese, "english": has_english, "other": not (has_japanese or has_english) } return result except Exception as e: return { "valid": False, "error": f"Analysis failed: {str(e)}", "subtitle_count": 0 } @mcp.tool( description="""LM Studioの接続状態を確認する。 APIの到達可能性とモデルの利用可能性をチェックします。""" ) async def check_lm_studio_status( lm_studio_url: Optional[str] = None, model_name: Optional[str] = None ) -> dict: """ LM Studioの接続状態を確認 Args: lm_studio_url: LM StudioのAPI URL (省略時は環境変数を使用) model_name: 確認するモデル名 (省略時は環境変数を使用) Returns: dict: 接続状態とモデル情報 """ import httpx # デフォルト値を使用 if lm_studio_url is None: lm_studio_url = DEFAULT_LM_STUDIO_URL if model_name is None: model_name = DEFAULT_MODEL_NAME # URLの整形 base_url = lm_studio_url.rstrip('/') if '/v1' not in base_url: base_url = base_url + '/v1' result = { "lm_studio_url": base_url, "model_name": model_name or "(not configured)", "api_reachable": False, "models_endpoint_available": False, "model_available": False, "available_models": [], "error": None } try: async with httpx.AsyncClient(timeout=5.0) as client: # APIの到達可能性を確認 try: response = await client.get(f"{base_url}/models") response.raise_for_status() result["api_reachable"] = True result["models_endpoint_available"] = True # 利用可能なモデルのリストを取得 models_data = response.json() if "data" in models_data: available_models = [model["id"] for model in models_data["data"]] result["available_models"] = available_models # 指定されたモデルが利用可能か確認 if model_name and model_name in available_models: result["model_available"] = True elif model_name: result["error"] = f"Model '{model_name}' not found in available models" except httpx.HTTPStatusError as e: result["error"] = f"HTTP Error {e.response.status_code}: Unable to reach models endpoint" except httpx.RequestError as e: result["error"] = f"Connection Error: {str(e)}" except Exception as e: result["error"] = f"Unexpected error: {str(e)}" # 推奨事項を追加 if not result["api_reachable"]: result["recommendation"] = "Please ensure LM Studio is running and accessible at the specified URL" elif not result["models_endpoint_available"]: result["recommendation"] = "LM Studio is reachable but the models endpoint is not available" elif not result["model_available"] and model_name: result["recommendation"] = f"Please load the model '{model_name}' in LM Studio or use one of the available models: {', '.join(result['available_models'][:3])}" else: result["recommendation"] = "LM Studio is properly configured and ready for translation" return result @mcp.tool( description="""字幕の簡易プレビューを生成する。 最初と最後の数個の字幕を表示して内容を確認できます。""" ) async def preview_srt( srt_content: str, num_entries: int = 5, show_start: bool = True, show_end: bool = True ) -> dict: """ SRT字幕のプレビューを生成 Args: srt_content: プレビュー対象のSRT形式テキスト num_entries: 表示する字幕の数(開始と終了それぞれ) show_start: 最初の字幕を表示するか show_end: 最後の字幕を表示するか Returns: dict: プレビュー結果 """ try: entries = parse_srt_string(srt_content) if not entries: return { "success": False, "error": "No valid SRT entries found", "total_entries": 0 } preview = { "success": True, "total_entries": len(entries), "preview_entries": {} } # 開始部分のプレビュー if show_start: start_entries = [] for i in range(min(num_entries, len(entries))): entry = entries[i] start_entries.append({ "index": entry[0], "time": f"{entry[1]} --> {entry[2]}", "text": entry[3], "char_count": len(entry[3]) }) preview["preview_entries"]["start"] = start_entries # 終了部分のプレビュー if show_end and len(entries) > num_entries: end_entries = [] start_idx = max(0, len(entries) - num_entries) for i in range(start_idx, len(entries)): entry = entries[i] end_entries.append({ "index": entry[0], "time": f"{entry[1]} --> {entry[2]}", "text": entry[3], "char_count": len(entry[3]) }) preview["preview_entries"]["end"] = end_entries # 中間部分のサンプル(オプション) if len(entries) > num_entries * 3: middle_idx = len(entries) // 2 middle_entry = entries[middle_idx] preview["preview_entries"]["middle_sample"] = { "index": middle_entry[0], "time": f"{middle_entry[1]} --> {middle_entry[2]}", "text": middle_entry[3], "char_count": len(middle_entry[3]) } return preview except Exception as e: return { "success": False, "error": f"Preview generation failed: {str(e)}", "total_entries": 0 } def main(): """メインエントリーポイント""" # MCPサーバーを起動(stdioトランスポート使用) mcp.run() if __name__ == "__main__": main()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/sumik5/translate-srt-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server