list_files
List result files for a task to see available outputs. Provide the task ID to get the file list.
Instructions
列出任务的结果文件
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| task_id | Yes | ||
| server | No | default |
Output Schema
| Name | Required | Description | Default |
|---|---|---|---|
| result | Yes |
Implementation Reference
- Core handler function that makes HTTP GET request to /api/v1/client/files/list/{task_id} to retrieve task file list, formats file info (id, filename, size, created_at), and returns structured result with error handling.
def list_files(server_url, api_key, task_id): """ 获取任务文件列表 Args: server_url: 服务端基础地址 api_key: API 密钥 task_id: 任务 ID """ if not api_key: return {"error": "缺少 API Key,请先运行 register 进行注册"} if not task_id: return {"error": "缺少 task_id"} server_url = server_url.rstrip("/") url = f"{server_url}/api/v1/client/files/list/{task_id}" try: headers = { "Authorization": f"Bearer {api_key}" } req = urllib.request.Request(url, headers=headers, method="GET") with urllib.request.urlopen(req, timeout=30) as response: result = json.loads(response.read().decode("utf-8")) files = result if isinstance(result, list) else result.get("files", []) formatted = [] for f in files: formatted.append({ "file_id": f.get("id") or f.get("file_id"), "filename": f.get("filename", ""), "size": f.get("size", 0), "created_at": f.get("created_at", "") }) return { "content": f"任务 {task_id} 共有 {len(formatted)} 个文件", "data": { "task_id": task_id, "files": formatted } - MCP tool handler decorated with @mcp.tool() that lists task result files by calling the same API endpoint with server config and auth, returning a formatted string of file details.
@mcp.tool() def list_files(task_id: str, server: str = "default") -> str: """列出任务的结果文件""" if not task_id: return "❌ 缺少必填参数: task_id" try: sc = get_server_config(server) api_key = require_api_key(server) except RuntimeError as e: return f"❌ {e}" server_url = strip_trailing_slash(sc.get("server_url", "")) url = f"{server_url}/api/v1/client/files/list/{quote(task_id, safe='')}" try: result = safe_request( "GET", url, headers={"Authorization": f"Bearer {api_key}"} ) except OpenAaaSError as e: return f"❌ 获取文件列表失败: {e}" files = result if isinstance(result, list) else result.get("files", []) if not files: return f"任务 {task_id} 没有结果文件" lines = [f"任务 {task_id} 共有 {len(files)} 个结果文件:"] for i, f in enumerate(files): if not isinstance(f, dict): continue filename = f.get("filename") or f.get("name", "unnamed") file_id = f.get("id") or f.get("file_id", "") size = f.get("size") if size is None: size = f.get("file_size", "未知") lines.append(f"{i + 1}. {filename} (ID: {file_id}, 大小: {size})") return "\n".join(lines) - Registered as an MCP tool via the @mcp.tool() decorator on the list_files method.
@mcp.tool() def list_files(task_id: str, server: str = "default") -> str: