veo_get_tasks_batch
Query multiple video generation tasks at once to get their statuses and results in a single request.
Instructions
Query multiple video generation tasks at once.
Efficiently check the status of multiple tasks in a single request.
More efficient than calling veo_get_task multiple times.
Use this when:
- You have multiple pending generations to check
- You want to get status of several videos at once
- You're tracking a batch of generations
Returns:
Status and video information for all queried tasks.Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| task_ids | Yes | List of task IDs to query. Maximum recommended batch size is 50 tasks. |
Output Schema
| Name | Required | Description | Default |
|---|---|---|---|
| result | Yes |
Implementation Reference
- tools/task_tools.py:53-99 (handler)The handler function for veo_get_tasks_batch tool. It is decorated with @mcp.tool(), accepts a list of task IDs, calls client.query_task() with action='retrieve_batch', and formats the response into a readable string with total count, per-task status, and video URLs.
@mcp.tool() async def veo_get_tasks_batch( task_ids: Annotated[ list[str], Field(description="List of task IDs to query. Maximum recommended batch size is 50 tasks."), ], ) -> str: """Query multiple video generation tasks at once. Efficiently check the status of multiple tasks in a single request. More efficient than calling veo_get_task multiple times. Use this when: - You have multiple pending generations to check - You want to get status of several videos at once - You're tracking a batch of generations Returns: Status and video information for all queried tasks. """ result = await client.query_task( ids=task_ids, action="retrieve_batch", ) if "error" in result: error = result.get("error", {}) return f"Error: {error.get('code', 'unknown')} - {error.get('message', 'Unknown error')}" lines = [f"Total Tasks: {result.get('count', 0)}", ""] for item in result.get("items", []): response_info = item.get("response", {}) lines.extend( [ f"=== Task: {item.get('id', 'N/A')} ===", f"Created At: {item.get('created_at', 'N/A')}", f"Success: {response_info.get('success', False)}", ] ) for video in response_info.get("data", []): lines.append(f" - {video.get('id', 'Unknown')}: {video.get('video_url', 'N/A')}") lines.append("") return "\n".join(lines) - tools/task_tools.py:52-53 (registration)The @mcp.tool() decorator on veo_get_tasks_batch registers it as an MCP tool with the FastMCP server instance (imported from core.server).
@mcp.tool() - core/utils.py:7-30 (helper)The _with_submission_guidance() helper references 'veo_get_tasks_batch' as the batch_poll_tool parameter, adding async submission metadata that includes this tool name for polling guidance.
def _with_submission_guidance( data: dict[str, Any], poll_tool: str, batch_poll_tool: str | None = None ) -> dict[str, Any]: payload = dict(data) task_id = payload.get("task_id") if not task_id: return payload payload["mcp_async_submission"] = { "task_id": task_id, "poll_tool": poll_tool, "batch_poll_tool": batch_poll_tool, "polling_interval_seconds": 15, "max_poll_attempts": 100, "expected_wait_seconds": 600, "next_step": ( f'Call {poll_tool}(task_id="{task_id}") to poll until the task completes ' f"and the final media URLs are available. " f"IMPORTANT: Media generation typically takes 1-5 minutes. " f"Wait at least 15 seconds between each poll. " f"Keep polling for up to 100 attempts. Do NOT stop early — the task is still running." ), } return payload - core/utils.py:33-55 (helper)The _with_task_guidance() helper references 'veo_get_tasks_batch' as the batch_poll_tool parameter, adding task polling metadata that includes this tool name for guidance.
def _with_task_guidance( data: dict[str, Any], poll_tool: str, batch_poll_tool: str | None = None ) -> dict[str, Any]: payload = dict(data) task_id = payload.get("id") or payload.get("task_id") if not task_id: return payload payload["mcp_task_polling"] = { "task_id": task_id, "poll_tool": poll_tool, "batch_poll_tool": batch_poll_tool, "polling_interval_seconds": 15, "max_poll_attempts": 100, "next_step": ( f"The task is still pending or processing. " f'Wait 15 seconds, then call {poll_tool}(task_id="{task_id}") again. ' f"IMPORTANT: Media generation typically takes 1-5 minutes. " f"Keep polling — do NOT give up or tell the user it failed. " f"You should poll at least 100 times before considering the task stuck." ), } return payload - core/client.py:183-187 (handler)The query_task() method on VeoClient that veo_get_tasks_batch calls. It accepts **kwargs (including ids=task_ids and action='retrieve_batch'), sends a POST request to '/veo/tasks' endpoint.
async def query_task(self, **kwargs: Any) -> dict[str, Any]: """Query task status using the tasks endpoint.""" task_id = kwargs.get("id") or kwargs.get("ids", []) logger.info(f"🔍 Querying task(s): {task_id}") return await self.request("/veo/tasks", kwargs)