Skip to main content
Glama

JianYing MCP

by hey-jian-wei
index_manager.py11.2 kB
# -*- coding: utf-8 -*- """ Author: jian wei File Name: index_manager.py """ import json import os import uuid from typing import Dict, Any, Optional from dotenv import load_dotenv # 加载环境变量 load_dotenv() # 获取环境变量 SAVE_PATH = os.getenv('SAVE_PATH') class IndexManager: """ 索引管理器 负责维护草稿ID、轨道ID、片段ID之间的映射关系 """ def __init__(self): self.index_file_path = os.path.join(SAVE_PATH, "global_index.json") self._ensure_index_file() def _ensure_index_file(self): """确保索引文件存在""" if not os.path.exists(self.index_file_path): # 创建空的索引结构 empty_index = { "draft_mappings": {}, "track_mappings": {}, "video_segment_mappings": {}, "audio_segment_mappings": {}, "text_segment_mappings": {} } self._save_index(empty_index) def _load_index(self) -> Dict[str, Any]: """加载索引数据""" try: with open(self.index_file_path, 'r', encoding='utf-8') as f: return json.load(f) except Exception as e: print(f"加载索引文件失败: {e}") return { "draft_mappings": {}, "track_mappings": {}, "video_segment_mappings": {}, "audio_segment_mappings": {}, "text_segment_mappings": {} } def _save_index(self, index_data: Dict[str, Any]): """保存索引数据""" try: # 确保目录存在 os.makedirs(os.path.dirname(self.index_file_path), exist_ok=True) with open(self.index_file_path, 'w', encoding='utf-8') as f: json.dump(index_data, f, ensure_ascii=False, indent=2) except Exception as e: print(f"保存索引文件失败: {e}") # ==================== 添加映射方法 ==================== def add_draft_mapping(self, draft_id: str, draft_info: Dict[str, Any]): """ 添加草稿映射记录 Args: draft_id: 草稿ID draft_info: 草稿信息字典 """ index_data = self._load_index() index_data["draft_mappings"][draft_id] = draft_info self._save_index(index_data) def add_track_mapping(self, track_id: str, draft_id: str, track_name: str, track_type: str): """ 添加轨道映射 Args: track_id: 轨道ID draft_id: 草稿ID track_name: 轨道名称 track_type: 轨道类型 (video/audio/text) """ index_data = self._load_index() index_data["track_mappings"][track_id] = { "draft_id": draft_id, "track_name": track_name, "track_type": track_type } self._save_index(index_data) def add_video_segment_mapping(self, video_segment_id: str, track_id: str): """ 添加视频片段映射 Args: video_segment_id: 视频片段ID track_id: 轨道ID """ index_data = self._load_index() # 通过track_id获取draft_id draft_id = self.get_draft_id_by_track_id(track_id) if draft_id: index_data["video_segment_mappings"][video_segment_id] = { "draft_id": draft_id, "track_id": track_id } self._save_index(index_data) def add_audio_segment_mapping(self, audio_segment_id: str, track_id: str): """ 添加音频片段映射 Args: audio_segment_id: 音频片段ID track_id: 轨道ID """ index_data = self._load_index() # 通过track_id获取draft_id draft_id = self.get_draft_id_by_track_id(track_id) if draft_id: index_data["audio_segment_mappings"][audio_segment_id] = { "draft_id": draft_id, "track_id": track_id } self._save_index(index_data) def add_text_segment_mapping(self, text_segment_id: str, track_id: str): """ 添加文本片段映射 Args: text_segment_id: 文本片段ID track_id: 轨道ID """ index_data = self._load_index() # 通过track_id获取draft_id draft_id = self.get_draft_id_by_track_id(track_id) if draft_id: index_data["text_segment_mappings"][text_segment_id] = { "draft_id": draft_id, "track_id": track_id } self._save_index(index_data) def get_draft_info(self, draft_id: str) -> Optional[Dict[str, Any]]: """ 获取草稿完整信息 Args: draft_id: 草稿ID Returns: 草稿信息字典,如果不存在返回None """ index_data = self._load_index() return index_data["draft_mappings"].get(draft_id) def get_draft_id_by_track_id(self, track_id: str) -> Optional[str]: """ 通过轨道ID获取草稿ID Args: track_id: 轨道ID Returns: 草稿ID,如果不存在返回None """ index_data = self._load_index() track_info = index_data["track_mappings"].get(track_id) return track_info.get("draft_id") if track_info else None def get_draft_id_by_video_segment_id(self, video_segment_id: str) -> Optional[str]: """ 通过视频片段ID获取草稿ID Args: video_segment_id: 视频片段ID Returns: 草稿ID,如果不存在返回None """ index_data = self._load_index() segment_info = index_data["video_segment_mappings"].get(video_segment_id) return segment_info.get("draft_id") if segment_info else None def get_track_id_by_video_segment_id(self, video_segment_id: str) -> Optional[str]: """ 通过视频片段ID获取轨道ID Args: video_segment_id: 视频片段ID Returns: 轨道ID,如果不存在返回None """ index_data = self._load_index() segment_info = index_data["video_segment_mappings"].get(video_segment_id) return segment_info.get("track_id") if segment_info else None def get_track_name_by_track_id(self, track_id: str) -> Optional[str]: """ 通过轨道ID获取轨道名 Args: track_id: 轨道ID Returns: 轨道名,如果不存在返回None """ index_data = self._load_index() track_info = index_data["track_mappings"].get(track_id) return track_info.get("track_name") if track_info else None def get_track_info_by_video_segment_id(self, video_segment_id: str) -> Optional[Dict[str, Any]]: """ 通过视频片段ID获取完整轨道信息 Args: video_segment_id: 视频片段ID Returns: 轨道信息字典,如果不存在返回None """ track_id = self.get_track_id_by_video_segment_id(video_segment_id) if track_id: index_data = self._load_index() return index_data["track_mappings"].get(track_id) return None def get_track_info_by_track_id(self, track_id: str) -> Optional[Dict[str, Any]]: """ 通过轨道ID获取完整轨道信息 Args: track_id: 轨道ID Returns: 轨道信息字典,如果不存在返回None """ index_data = self._load_index() return index_data["track_mappings"].get(track_id) def get_draft_id_by_text_segment_id(self, text_segment_id: str) -> Optional[str]: """ 通过文本片段ID获取草稿ID Args: text_segment_id: 文本片段ID Returns: 草稿ID,如果不存在返回None """ index_data = self._load_index() segment_info = index_data["text_segment_mappings"].get(text_segment_id) return segment_info.get("draft_id") if segment_info else None def get_track_id_by_text_segment_id(self, text_segment_id: str) -> Optional[str]: """ 通过文本片段ID获取轨道ID Args: text_segment_id: 文本片段ID Returns: 轨道ID,如果不存在返回None """ index_data = self._load_index() segment_info = index_data["text_segment_mappings"].get(text_segment_id) return segment_info.get("track_id") if segment_info else None def get_track_info_by_text_segment_id(self, text_segment_id: str) -> Optional[Dict[str, Any]]: """ 通过文本片段ID获取完整轨道信息 Args: text_segment_id: 文本片段ID Returns: 轨道信息字典,如果不存在返回None """ track_id = self.get_track_id_by_text_segment_id(text_segment_id) if track_id: index_data = self._load_index() return index_data["track_mappings"].get(track_id) return None def get_draft_id_by_audio_segment_id(self, audio_segment_id: str) -> Optional[str]: """ 通过音频片段ID获取草稿ID Args: audio_segment_id: 音频片段ID Returns: 草稿ID,如果不存在返回None """ index_data = self._load_index() segment_info = index_data["audio_segment_mappings"].get(audio_segment_id) return segment_info.get("draft_id") if segment_info else None def get_track_id_by_audio_segment_id(self, audio_segment_id: str) -> Optional[str]: """ 通过音频片段ID获取轨道ID Args: audio_segment_id: 音频片段ID Returns: 轨道ID,如果不存在返回None """ index_data = self._load_index() segment_info = index_data["audio_segment_mappings"].get(audio_segment_id) return segment_info.get("track_id") if segment_info else None def get_track_info_by_audio_segment_id(self, audio_segment_id: str) -> Optional[Dict[str, Any]]: """ 通过音频片段ID获取完整轨道信息 Args: audio_segment_id: 音频片段ID Returns: 轨道信息字典,如果不存在返回None """ track_id = self.get_track_id_by_audio_segment_id(audio_segment_id) if track_id: index_data = self._load_index() return index_data["track_mappings"].get(track_id) return None # ==================== 工具方法 ==================== def generate_id(self) -> str: """生成新的UUID""" return str(uuid.uuid4()) # 全局索引管理器实例 index_manager = IndexManager()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/hey-jian-wei/jianying-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server