"""
模板管理服务
提供模板加载、渲染、验证和管理功能。
"""
import os
import logging
logger = logging.getLogger(__name__)
from pathlib import Path
from typing import Dict, List, Any, Optional
from datetime import datetime
import json
try:
from jinja2 import Environment, FileSystemLoader, Template, TemplateSyntaxError, TemplateError
except ImportError:
raise ImportError("需要安装jinja2包: pip install jinja2>=3.1.0")
from data_access import FileSystemInterface
class TemplateService:
"""模板管理服务"""
def __init__(self, template_dir: Optional[str] = None, file_system: Optional[FileSystemInterface] = None):
"""
初始化模板服务
Args:
template_dir: 模板目录路径
file_system: 文件系统接口
"""
self.file_system = file_system
self.template_dir = Path(template_dir) if template_dir else Path(__file__).parent.parent.parent / "config" / "templates"
# 确保模板目录存在
self.template_dir.mkdir(parents=True, exist_ok=True)
# 初始化Jinja2环境
self.env = Environment(
loader=FileSystemLoader(str(self.template_dir)),
trim_blocks=True,
lstrip_blocks=True,
keep_trailing_newline=True
)
# 添加自定义过滤器
self._add_custom_filters()
# 内置模板缓存
self._template_cache = {}
def _add_custom_filters(self):
"""添加自定义过滤器"""
def format_size(size_bytes: int) -> str:
"""格式化文件大小"""
if size_bytes == 0:
return "0 B"
size_names = ["B", "KB", "MB", "GB", "TB"]
i = 0
while size_bytes >= 1024.0 and i < len(size_names) - 1:
size_bytes /= 1024.0
i += 1
return f"{size_bytes:.1f} {size_names[i]}"
def format_datetime(timestamp: str, format_str: str = "%Y-%m-%d %H:%M:%S") -> str:
"""格式化时间戳"""
try:
dt = datetime.fromisoformat(timestamp.replace('Z', '+00:00'))
return dt.strftime(format_str)
except (RuntimeError) as e:
logger.warning(f"操作失败: {e}")
return timestamp
def get_file_type_icon(file_type: str) -> str:
"""获取文件类型图标"""
icons = {
'code': '📝',
'documentation': '📚',
'image': '🖼️',
'video': '🎬',
'audio': '🎵',
'archive': '📦',
'config': '⚙️',
'other': '📄'
}
return icons.get(file_type, '📄')
def truncate_text(text: str, length: int = 50) -> str:
"""截断文本"""
if len(text) <= length:
return text
return text[:length] + "..."
# 注册过滤器
self.env.filters['format_size'] = format_size
self.env.filters['format_datetime'] = format_datetime
self.env.filters['file_icon'] = get_file_type_icon
self.env.filters['truncate'] = truncate_text
async def render_template(self, template_name: str, context: Dict[str, Any]) -> str:
"""
渲染模板
Args:
template_name: 模板名称
context: 模板上下文数据
Returns:
渲染后的内容
"""
try:
# 检查缓存
cache_key = f"{template_name}:{hash(str(context))}"
if cache_key in self._template_cache:
return self._template_cache[cache_key]
# 加载模板
template = self.env.get_template(template_name)
# 添加默认上下文
full_context = self._get_default_context()
full_context.update(context)
# 渲染模板
rendered_content = template.render(**full_context)
# 缓存结果(简单缓存,实际项目中应该考虑过期策略)
if len(self._template_cache) < 100: # 限制缓存大小
self._template_cache[cache_key] = rendered_content
return rendered_content
except TemplateSyntaxError as e:
raise ValueError(f"模板语法错误: {e.message} (行 {e.lineno})") from e
except TemplateError as e:
raise ValueError(f"模板渲染错误: {e.message}") from e
except (RuntimeError, ValueError) as e:
raise RuntimeError(f"渲染模板失败: {e}") from e
def _get_default_context(self) -> Dict[str, Any]:
"""获取默认模板上下文"""
return {
'now': datetime.now(),
'generator': 'Optimized Folder Docs MCP Server',
'version': '2.0.0',
'functions': {
'format_size': self.env.filters['format_size'],
'format_datetime': self.env.filters['format_datetime'],
'file_icon': self.env.filters['file_icon'],
'truncate': self.env.filters['truncate']
}
}
async def render_template_string(self, template_string: str, context: Dict[str, Any]) -> str:
"""
从字符串渲染模板
Args:
template_string: 模板字符串
context: 模板上下文数据
Returns:
渲染后的内容
"""
try:
template = self.env.from_string(template_string)
# 添加默认上下文
full_context = self._get_default_context()
full_context.update(context)
return template.render(**full_context)
except TemplateSyntaxError as e:
raise ValueError(f"模板语法错误: {e.message} (行 {e.lineno})") from e
except (RuntimeError, ValueError) as e:
raise RuntimeError(f"渲染模板字符串失败: {e}") from e
def list_available_templates(self) -> List[str]:
"""
列出可用的模板
Returns:
模板文件名列表
"""
try:
templates = []
for file in self.template_dir.glob("*.j2"):
templates.append(file.name)
return sorted(templates)
except (RuntimeError, ValueError) as e:
raise RuntimeError(f"获取模板列表失败: {e}") from e
def template_exists(self, template_name: str) -> bool:
"""
检查模板是否存在
Args:
template_name: 模板名称
Returns:
模板是否存在
"""
try:
template_path = self.template_dir / template_name
return template_path.exists() and template_path.is_file()
except (RuntimeError, TemplateError, TypeError) as e:
return False
async def validate_template(self, template_content: str) -> Dict[str, Any]:
"""
验证模板语法
Args:
template_content: 模板内容
Returns:
验证结果
"""
try:
# 尝试解析模板
self.env.parse(template_content)
return {
'valid': True,
'errors': [],
'warnings': []
}
except TemplateSyntaxError as e:
return {
'valid': False,
'errors': [
{
'type': 'syntax_error',
'message': e.message,
'line': e.lineno,
'name': e.name
}
],
'warnings': []
}
except (RuntimeError, ValueError) as e:
return {
'valid': False,
'errors': [
{
'type': 'unknown_error',
'message': str(e)
}
],
'warnings': []
}
async def create_template(self, template_name: str, content: str, overwrite: bool = False) -> bool:
"""
创建新模板
Args:
template_name: 模板名称
content: 模板内容
overwrite: 是否覆盖已存在的模板
Returns:
是否创建成功
"""
try:
template_path = self.template_dir / template_name
# 检查文件是否存在
if template_path.exists() and not overwrite:
raise ValueError(f"模板已存在: {template_name}")
# 验证模板语法
validation = await self.validate_template(content)
if not validation['valid']:
raise ValueError(f"模板语法错误: {validation['errors']}")
# 写入文件
template_path.write_text(content, encoding='utf-8')
# 清除相关缓存
self._clear_template_cache(template_name)
return True
except (RuntimeError, ValueError) as e:
raise RuntimeError(f"创建模板失败: {e}") from e
async def delete_template(self, template_name: str) -> bool:
"""
删除模板
Args:
template_name: 模板名称
Returns:
是否删除成功
"""
try:
template_path = self.template_dir / template_name
if not template_path.exists():
raise ValueError(f"模板不存在: {template_name}")
# 删除文件
template_path.unlink()
# 清除相关缓存
self._clear_template_cache(template_name)
return True
except (RuntimeError, ValueError) as e:
raise RuntimeError(f"删除模板失败: {e}") from e
def get_template_content(self, template_name: str) -> str:
"""
获取模板内容
Args:
template_name: 模板名称
Returns:
模板内容
"""
try:
template_path = self.template_dir / template_name
if not template_path.exists():
raise ValueError(f"模板不存在: {template_name}")
return template_path.read_text(encoding='utf-8')
except (RuntimeError, ValueError) as e:
raise RuntimeError(f"获取模板内容失败: {e}") from e
def _clear_template_cache(self, template_name: str):
"""清除指定模板的缓存"""
keys_to_remove = [key for key in self._template_cache.keys() if key.startswith(template_name + ":")]
for key in keys_to_remove:
del self._template_cache[key]
def get_template_info(self, template_name: str) -> Dict[str, Any]:
"""
获取模板信息
Args:
template_name: 模板名称
Returns:
模板信息
"""
try:
template_path = self.template_dir / template_name
if not template_path.exists():
raise ValueError(f"模板不存在: {template_name}")
stat = template_path.stat()
content = template_path.read_text(encoding='utf-8')
# 分析模板变量
variables = self._extract_template_variables(content)
return {
'name': template_name,
'path': str(template_path),
'size': stat.st_size,
'size_human': self._format_size(stat.st_size),
'created_time': datetime.fromtimestamp(stat.st_ctime).isoformat(),
'modified_time': datetime.fromtimestamp(stat.st_mtime).isoformat(),
'variables': variables,
'line_count': len(content.splitlines())
}
except (RuntimeError, ValueError) as e:
raise RuntimeError(f"获取模板信息失败: {e}") from e
def _format_size(self, size_bytes: int) -> str:
"""格式化文件大小"""
if size_bytes == 0:
return "0 B"
size_names = ["B", "KB", "MB", "GB", "TB"]
i = 0
while size_bytes >= 1024.0 and i < len(size_names) - 1:
size_bytes /= 1024.0
i += 1
return f"{size_bytes:.1f} {size_names[i]}"
def _extract_template_variables(self, content: str) -> List[str]:
"""提取模板变量"""
import re
# 简单的变量提取,匹配 {{ variable }} 格式
pattern = r'\{\{\s*([^}]+)\s*\}\}'
matches = re.findall(pattern, content)
variables = []
for match in matches:
# 清理变量名,移除过滤器和函数调用
variable = match.split('|')[0].strip()
# 移除属性访问,只保留主要变量名
variable = variable.split('.')[0].strip()
if variable and variable not in variables:
variables.append(variable)
return sorted(variables)
async def export_template(self, template_name: str, export_path: str) -> bool:
"""
导出模板到指定路径
Args:
template_name: 模板名称
export_path: 导出路径
Returns:
是否导出成功
"""
try:
content = self.get_template_content(template_name)
export_file = Path(export_path)
export_file.parent.mkdir(parents=True, exist_ok=True)
export_file.write_text(content, encoding='utf-8')
return True
except (RuntimeError, ValueError) as e:
raise RuntimeError(f"导出模板失败: {e}") from e
async def import_template(self, import_path: str, template_name: Optional[str] = None) -> bool:
"""
从指定路径导入模板
Args:
import_path: 导入路径
template_name: 模板名称(可选,默认使用文件名)
Returns:
是否导入成功
"""
try:
import_file = Path(import_path)
if not import_file.exists():
raise ValueError(f"导入文件不存在: {import_path}")
if not template_name:
template_name = import_file.name
# 如果不是.j2后缀,添加.j2后缀
if not template_name.endswith('.j2'):
template_name += '.j2'
content = import_file.read_text(encoding='utf-8')
return await self.create_template(template_name, content, overwrite=True)
except (RuntimeError, ValueError) as e:
raise RuntimeError(f"导入模板失败: {e}") from e