Skip to main content
Glama
template_service.py15.7 kB
""" 模板管理服务 提供模板加载、渲染、验证和管理功能。 """ import os import logging logger = logging.getLogger(__name__) from pathlib import Path from typing import Dict, List, Any, Optional from datetime import datetime import json try: from jinja2 import Environment, FileSystemLoader, Template, TemplateSyntaxError, TemplateError except ImportError: raise ImportError("需要安装jinja2包: pip install jinja2>=3.1.0") from data_access import FileSystemInterface class TemplateService: """模板管理服务""" def __init__(self, template_dir: Optional[str] = None, file_system: Optional[FileSystemInterface] = None): """ 初始化模板服务 Args: template_dir: 模板目录路径 file_system: 文件系统接口 """ self.file_system = file_system self.template_dir = Path(template_dir) if template_dir else Path(__file__).parent.parent.parent / "config" / "templates" # 确保模板目录存在 self.template_dir.mkdir(parents=True, exist_ok=True) # 初始化Jinja2环境 self.env = Environment( loader=FileSystemLoader(str(self.template_dir)), trim_blocks=True, lstrip_blocks=True, keep_trailing_newline=True ) # 添加自定义过滤器 self._add_custom_filters() # 内置模板缓存 self._template_cache = {} def _add_custom_filters(self): """添加自定义过滤器""" def format_size(size_bytes: int) -> str: """格式化文件大小""" if size_bytes == 0: return "0 B" size_names = ["B", "KB", "MB", "GB", "TB"] i = 0 while size_bytes >= 1024.0 and i < len(size_names) - 1: size_bytes /= 1024.0 i += 1 return f"{size_bytes:.1f} {size_names[i]}" def format_datetime(timestamp: str, format_str: str = "%Y-%m-%d %H:%M:%S") -> str: """格式化时间戳""" try: dt = datetime.fromisoformat(timestamp.replace('Z', '+00:00')) return dt.strftime(format_str) except (RuntimeError) as e: logger.warning(f"操作失败: {e}") return timestamp def get_file_type_icon(file_type: str) -> str: """获取文件类型图标""" icons = { 'code': '📝', 'documentation': '📚', 'image': '🖼️', 'video': '🎬', 'audio': '🎵', 'archive': '📦', 'config': '⚙️', 'other': '📄' } return icons.get(file_type, '📄') def truncate_text(text: str, length: int = 50) -> str: """截断文本""" if len(text) <= length: return text return text[:length] + "..." # 注册过滤器 self.env.filters['format_size'] = format_size self.env.filters['format_datetime'] = format_datetime self.env.filters['file_icon'] = get_file_type_icon self.env.filters['truncate'] = truncate_text async def render_template(self, template_name: str, context: Dict[str, Any]) -> str: """ 渲染模板 Args: template_name: 模板名称 context: 模板上下文数据 Returns: 渲染后的内容 """ try: # 检查缓存 cache_key = f"{template_name}:{hash(str(context))}" if cache_key in self._template_cache: return self._template_cache[cache_key] # 加载模板 template = self.env.get_template(template_name) # 添加默认上下文 full_context = self._get_default_context() full_context.update(context) # 渲染模板 rendered_content = template.render(**full_context) # 缓存结果(简单缓存,实际项目中应该考虑过期策略) if len(self._template_cache) < 100: # 限制缓存大小 self._template_cache[cache_key] = rendered_content return rendered_content except TemplateSyntaxError as e: raise ValueError(f"模板语法错误: {e.message} (行 {e.lineno})") from e except TemplateError as e: raise ValueError(f"模板渲染错误: {e.message}") from e except (RuntimeError, ValueError) as e: raise RuntimeError(f"渲染模板失败: {e}") from e def _get_default_context(self) -> Dict[str, Any]: """获取默认模板上下文""" return { 'now': datetime.now(), 'generator': 'Optimized Folder Docs MCP Server', 'version': '2.0.0', 'functions': { 'format_size': self.env.filters['format_size'], 'format_datetime': self.env.filters['format_datetime'], 'file_icon': self.env.filters['file_icon'], 'truncate': self.env.filters['truncate'] } } async def render_template_string(self, template_string: str, context: Dict[str, Any]) -> str: """ 从字符串渲染模板 Args: template_string: 模板字符串 context: 模板上下文数据 Returns: 渲染后的内容 """ try: template = self.env.from_string(template_string) # 添加默认上下文 full_context = self._get_default_context() full_context.update(context) return template.render(**full_context) except TemplateSyntaxError as e: raise ValueError(f"模板语法错误: {e.message} (行 {e.lineno})") from e except (RuntimeError, ValueError) as e: raise RuntimeError(f"渲染模板字符串失败: {e}") from e def list_available_templates(self) -> List[str]: """ 列出可用的模板 Returns: 模板文件名列表 """ try: templates = [] for file in self.template_dir.glob("*.j2"): templates.append(file.name) return sorted(templates) except (RuntimeError, ValueError) as e: raise RuntimeError(f"获取模板列表失败: {e}") from e def template_exists(self, template_name: str) -> bool: """ 检查模板是否存在 Args: template_name: 模板名称 Returns: 模板是否存在 """ try: template_path = self.template_dir / template_name return template_path.exists() and template_path.is_file() except (RuntimeError, TemplateError, TypeError) as e: return False async def validate_template(self, template_content: str) -> Dict[str, Any]: """ 验证模板语法 Args: template_content: 模板内容 Returns: 验证结果 """ try: # 尝试解析模板 self.env.parse(template_content) return { 'valid': True, 'errors': [], 'warnings': [] } except TemplateSyntaxError as e: return { 'valid': False, 'errors': [ { 'type': 'syntax_error', 'message': e.message, 'line': e.lineno, 'name': e.name } ], 'warnings': [] } except (RuntimeError, ValueError) as e: return { 'valid': False, 'errors': [ { 'type': 'unknown_error', 'message': str(e) } ], 'warnings': [] } async def create_template(self, template_name: str, content: str, overwrite: bool = False) -> bool: """ 创建新模板 Args: template_name: 模板名称 content: 模板内容 overwrite: 是否覆盖已存在的模板 Returns: 是否创建成功 """ try: template_path = self.template_dir / template_name # 检查文件是否存在 if template_path.exists() and not overwrite: raise ValueError(f"模板已存在: {template_name}") # 验证模板语法 validation = await self.validate_template(content) if not validation['valid']: raise ValueError(f"模板语法错误: {validation['errors']}") # 写入文件 template_path.write_text(content, encoding='utf-8') # 清除相关缓存 self._clear_template_cache(template_name) return True except (RuntimeError, ValueError) as e: raise RuntimeError(f"创建模板失败: {e}") from e async def delete_template(self, template_name: str) -> bool: """ 删除模板 Args: template_name: 模板名称 Returns: 是否删除成功 """ try: template_path = self.template_dir / template_name if not template_path.exists(): raise ValueError(f"模板不存在: {template_name}") # 删除文件 template_path.unlink() # 清除相关缓存 self._clear_template_cache(template_name) return True except (RuntimeError, ValueError) as e: raise RuntimeError(f"删除模板失败: {e}") from e def get_template_content(self, template_name: str) -> str: """ 获取模板内容 Args: template_name: 模板名称 Returns: 模板内容 """ try: template_path = self.template_dir / template_name if not template_path.exists(): raise ValueError(f"模板不存在: {template_name}") return template_path.read_text(encoding='utf-8') except (RuntimeError, ValueError) as e: raise RuntimeError(f"获取模板内容失败: {e}") from e def _clear_template_cache(self, template_name: str): """清除指定模板的缓存""" keys_to_remove = [key for key in self._template_cache.keys() if key.startswith(template_name + ":")] for key in keys_to_remove: del self._template_cache[key] def get_template_info(self, template_name: str) -> Dict[str, Any]: """ 获取模板信息 Args: template_name: 模板名称 Returns: 模板信息 """ try: template_path = self.template_dir / template_name if not template_path.exists(): raise ValueError(f"模板不存在: {template_name}") stat = template_path.stat() content = template_path.read_text(encoding='utf-8') # 分析模板变量 variables = self._extract_template_variables(content) return { 'name': template_name, 'path': str(template_path), 'size': stat.st_size, 'size_human': self._format_size(stat.st_size), 'created_time': datetime.fromtimestamp(stat.st_ctime).isoformat(), 'modified_time': datetime.fromtimestamp(stat.st_mtime).isoformat(), 'variables': variables, 'line_count': len(content.splitlines()) } except (RuntimeError, ValueError) as e: raise RuntimeError(f"获取模板信息失败: {e}") from e def _format_size(self, size_bytes: int) -> str: """格式化文件大小""" if size_bytes == 0: return "0 B" size_names = ["B", "KB", "MB", "GB", "TB"] i = 0 while size_bytes >= 1024.0 and i < len(size_names) - 1: size_bytes /= 1024.0 i += 1 return f"{size_bytes:.1f} {size_names[i]}" def _extract_template_variables(self, content: str) -> List[str]: """提取模板变量""" import re # 简单的变量提取,匹配 {{ variable }} 格式 pattern = r'\{\{\s*([^}]+)\s*\}\}' matches = re.findall(pattern, content) variables = [] for match in matches: # 清理变量名,移除过滤器和函数调用 variable = match.split('|')[0].strip() # 移除属性访问,只保留主要变量名 variable = variable.split('.')[0].strip() if variable and variable not in variables: variables.append(variable) return sorted(variables) async def export_template(self, template_name: str, export_path: str) -> bool: """ 导出模板到指定路径 Args: template_name: 模板名称 export_path: 导出路径 Returns: 是否导出成功 """ try: content = self.get_template_content(template_name) export_file = Path(export_path) export_file.parent.mkdir(parents=True, exist_ok=True) export_file.write_text(content, encoding='utf-8') return True except (RuntimeError, ValueError) as e: raise RuntimeError(f"导出模板失败: {e}") from e async def import_template(self, import_path: str, template_name: Optional[str] = None) -> bool: """ 从指定路径导入模板 Args: import_path: 导入路径 template_name: 模板名称(可选,默认使用文件名) Returns: 是否导入成功 """ try: import_file = Path(import_path) if not import_file.exists(): raise ValueError(f"导入文件不存在: {import_path}") if not template_name: template_name = import_file.name # 如果不是.j2后缀,添加.j2后缀 if not template_name.endswith('.j2'): template_name += '.j2' content = import_file.read_text(encoding='utf-8') return await self.create_template(template_name, content, overwrite=True) except (RuntimeError, ValueError) as e: raise RuntimeError(f"导入模板失败: {e}") from e

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/kscz0000/Zhiwen-Assistant-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server