We provide all the information about MCP servers via our MCP API.
curl -X GET 'https://glama.ai/api/mcp/v1/servers/tengmmvp/Seedream_MCP'
If you have feedback or need assistance with the MCP directory API, please join our Discord server
"""
自动保存核心模块
集成下载和文件管理,实现完整的自动保存逻辑
"""
import asyncio
import logging
from typing import Optional, Dict, Any, List, Union
from pathlib import Path
from datetime import datetime
from .download_manager import DownloadManager, DownloadError
from .file_manager import FileManager, FileManagerError
logger = logging.getLogger(__name__)
class AutoSaveError(Exception):
"""自动保存错误异常"""
pass
class AutoSaveResult:
"""自动保存结果"""
def __init__(
self,
success: bool,
original_url: str,
local_path: Optional[str] = None,
markdown_ref: Optional[str] = None,
error: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None
):
self.success = success
self.original_url = original_url
self.local_path = local_path
self.markdown_ref = markdown_ref
self.error = error
self.metadata = metadata or {}
def to_dict(self) -> Dict[str, Any]:
"""转换为字典格式"""
result = {
'success': self.success,
'original_url': self.original_url
}
if self.local_path:
result['local_path'] = self.local_path
if self.markdown_ref:
result['markdown_ref'] = self.markdown_ref
if self.error:
result['error'] = self.error
if self.metadata:
result['metadata'] = self.metadata
return result
class AutoSaveManager:
"""自动保存管理器"""
def __init__(
self,
base_dir: Optional[Path] = None,
download_timeout: int = 30,
max_retries: int = 3,
max_file_size: int = 50 * 1024 * 1024, # 50MB
max_concurrent: int = 5
):
"""
初始化自动保存管理器
Args:
base_dir: 基础保存目录
download_timeout: 下载超时时间
max_retries: 最大重试次数
max_file_size: 最大文件大小
max_concurrent: 最大并发下载数
"""
self.file_manager = FileManager(base_dir)
self.download_manager = DownloadManager(
timeout=download_timeout,
max_retries=max_retries,
max_file_size=max_file_size
)
self.max_concurrent = max_concurrent
async def save_image(
self,
url: str,
prompt: str = "",
tool_name: str = "seedream",
custom_name: Optional[str] = None,
alt_text: Optional[str] = None
) -> AutoSaveResult:
"""
保存单个图片
Args:
url: 图片URL
prompt: 生成提示词
tool_name: 工具名称
custom_name: 自定义文件名
alt_text: Markdown替代文本
Returns:
保存结果
"""
try:
logger.info(f"开始自动保存图片: {url}")
# 验证URL
if not self.download_manager.validate_url(url):
raise AutoSaveError(f"无效的URL: {url}")
# 创建保存路径
save_path = self.file_manager.create_save_path(
prompt=prompt,
url=url,
tool_name=tool_name,
custom_name=custom_name
)
# 下载图片
download_result = await self.download_manager.download_image(url, save_path)
# 生成Markdown引用
markdown_alt = alt_text or prompt or "Generated Image"
markdown_ref = self.file_manager.generate_markdown_reference(
save_path, markdown_alt
)
# 构建元数据
metadata = {
'prompt': prompt,
'tool_name': tool_name,
'save_time': datetime.now().isoformat(),
'file_size': download_result.get('file_size', 0),
'download_time': download_result.get('download_time', 0),
'content_type': download_result.get('content_type', ''),
'attempts': download_result.get('attempts', 1)
}
result = AutoSaveResult(
success=True,
original_url=url,
local_path=str(save_path),
markdown_ref=markdown_ref,
metadata=metadata
)
logger.info(f"图片保存成功: {save_path}")
return result
except (DownloadError, FileManagerError, AutoSaveError) as e:
logger.error(f"图片保存失败: {url} -> {e}")
return AutoSaveResult(
success=False,
original_url=url,
error=str(e)
)
except Exception as e:
logger.error(f"图片保存出现未知错误: {url} -> {e}")
return AutoSaveResult(
success=False,
original_url=url,
error=f"未知错误: {e}"
)
async def save_multiple_images(
self,
image_data: List[Dict[str, Any]],
tool_name: str = "seedream"
) -> List[AutoSaveResult]:
"""
批量保存多个图片
Args:
image_data: 图片数据列表,每个元素包含url、prompt等信息
tool_name: 工具名称
Returns:
保存结果列表
"""
logger.info(f"开始批量保存 {len(image_data)} 个图片")
# 创建保存任务
tasks = []
for data in image_data:
url = data.get('url', '')
prompt = data.get('prompt', '')
custom_name = data.get('custom_name')
alt_text = data.get('alt_text')
task = self.save_image(
url=url,
prompt=prompt,
tool_name=tool_name,
custom_name=custom_name,
alt_text=alt_text
)
tasks.append(task)
# 限制并发数量
semaphore = asyncio.Semaphore(self.max_concurrent)
async def save_with_semaphore(task):
async with semaphore:
return await task
# 执行所有任务
results = await asyncio.gather(
*[save_with_semaphore(task) for task in tasks],
return_exceptions=True
)
# 处理异常结果
processed_results = []
for i, result in enumerate(results):
if isinstance(result, Exception):
url = image_data[i].get('url', 'unknown')
processed_results.append(AutoSaveResult(
success=False,
original_url=url,
error=str(result)
))
else:
processed_results.append(result)
# 统计结果
success_count = sum(1 for r in processed_results if r.success)
logger.info(f"批量保存完成: {success_count}/{len(image_data)} 成功")
return processed_results
def format_response_with_auto_save(
self,
original_response: Dict[str, Any],
auto_save_results: List[AutoSaveResult],
include_original_urls: bool = True
) -> Dict[str, Any]:
"""
格式化包含自动保存信息的响应
Args:
original_response: 原始API响应
auto_save_results: 自动保存结果列表
include_original_urls: 是否包含原始URL
Returns:
格式化后的响应
"""
response = original_response.copy()
# 添加自动保存信息
auto_save_info = {
'enabled': True,
'total_images': len(auto_save_results),
'successful_saves': sum(1 for r in auto_save_results if r.success),
'failed_saves': sum(1 for r in auto_save_results if not r.success),
'results': [r.to_dict() for r in auto_save_results]
}
response['auto_save'] = auto_save_info
# 添加本地路径和Markdown引用到图片信息中
images = response.get('images', [])
for i, (image, result) in enumerate(zip(images, auto_save_results)):
if result.success:
image['local_path'] = result.local_path
image['markdown_ref'] = result.markdown_ref
# 如果不包含原始URL,移除URL字段
if not include_original_urls and 'url' in image:
image['original_url'] = image.pop('url')
else:
image['auto_save_error'] = result.error
return response
def generate_markdown_summary(
self,
auto_save_results: List[AutoSaveResult],
title: str = "Generated Images"
) -> str:
"""
生成Markdown格式的图片摘要
Args:
auto_save_results: 自动保存结果列表
title: 摘要标题
Returns:
Markdown格式的摘要
"""
lines = [f"# {title}", ""]
successful_results = [r for r in auto_save_results if r.success]
failed_results = [r for r in auto_save_results if not r.success]
if successful_results:
lines.append("## Successfully Saved Images")
lines.append("")
for i, result in enumerate(successful_results, 1):
lines.append(f"### Image {i}")
if result.markdown_ref:
lines.append(result.markdown_ref)
if result.metadata and result.metadata.get('prompt'):
lines.append(f"**Prompt:** {result.metadata['prompt']}")
if result.local_path:
lines.append(f"**Local Path:** `{result.local_path}`")
lines.append("")
if failed_results:
lines.append("## Failed to Save")
lines.append("")
for i, result in enumerate(failed_results, 1):
lines.append(f"### Failed Image {i}")
lines.append(f"**URL:** {result.original_url}")
lines.append(f"**Error:** {result.error}")
lines.append("")
# 添加统计信息
lines.append("## Summary")
lines.append("")
lines.append(f"- Total images: {len(auto_save_results)}")
lines.append(f"- Successfully saved: {len(successful_results)}")
lines.append(f"- Failed to save: {len(failed_results)}")
return "\n".join(lines)
async def cleanup_old_files(self, days: int = 30) -> Dict[str, Any]:
"""
清理旧文件
Args:
days: 保留天数
Returns:
清理结果
"""
return self.file_manager.cleanup_old_files(days)
def get_storage_info(self) -> Dict[str, Any]:
"""
获取存储信息
Returns:
存储信息
"""
base_dir = self.file_manager.base_dir
try:
# 计算目录大小和文件数量
total_size = 0
file_count = 0
for file_path in base_dir.rglob("*"):
if file_path.is_file():
file_count += 1
total_size += file_path.stat().st_size
return {
'base_directory': str(base_dir),
'total_files': file_count,
'total_size_bytes': total_size,
'total_size_mb': round(total_size / (1024 * 1024), 2),
'directory_exists': base_dir.exists()
}
except Exception as e:
logger.error(f"获取存储信息失败: {e}")
return {
'base_directory': str(base_dir),
'error': str(e)
}