wait_job_completion
Poll job status at specified intervals until completion or timeout. Returns final status after job finishes.
Instructions
Wait for a job to complete
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| job_id | Yes | The ID of the job to get the status of | |
| interval | No | The interval to check the status of the job (seconds) | |
| timeout | No | The timeout to wait for the job to complete (seconds) | |
| api_key | No | PDF.co API key. If not provided, will use X_API_KEY environment variable. (Optional) |
Implementation Reference
- pdfco/mcp/tools/apis/job.py:57-106 (handler)The main tool handler for 'wait_job_completion'. It polls job status via _get_job_status in a loop, waiting for 'success' or 'failed' status. It supports configurable interval (default 1s) and timeout (default 300s). Accumulates credits_used across checks.
@mcp.tool() async def wait_job_completion( job_id: str = Field(description="The ID of the job to get the status of"), interval: int = Field( description="The interval to check the status of the job (seconds)", default=1 ), timeout: int = Field( description="The timeout to wait for the job to complete (seconds)", default=300 ), api_key: str = Field( description="PDF.co API key. If not provided, will use X_API_KEY environment variable. (Optional)", default="", ), ) -> BaseResponse: """ Wait for a job to complete """ start_time = time.time() job_check_count = 0 credits_used = 0 credits_remaining = 0 while True: response = await _get_job_status(job_id, api_key=api_key) job_check_count += 1 credits_used += response.credits_used or 0 credits_remaining = response.credits_remaining or 0 if response.status == "success": return BaseResponse( status="success", content=response.content, credits_used=credits_used, credits_remaining=credits_remaining, tips=f"Job check count: {job_check_count}", ) elif response.status == "failed": return BaseResponse( status="error", content=response.content, credits_used=credits_used, credits_remaining=credits_remaining, ) await asyncio.sleep(interval) if time.time() - start_time > timeout: return BaseResponse( status="error", content="Job timed out", credits_used=credits_used, credits_remaining=credits_remaining, tips=f"Job check count: {job_check_count}", ) - pdfco/mcp/tools/apis/job.py:10-34 (helper)Internal helper function that performs the actual API call to POST /v1/job/check to get job status. Used by both wait_job_completion and get_job_check tools.
async def _get_job_status(job_id: str, api_key: str = "") -> BaseResponse: """ Internal helper function to check job status without MCP tool decoration """ try: async with PDFCoClient(api_key=api_key) as client: response = await client.post( "/v1/job/check", json={ "jobId": job_id, }, ) json_data = response.json() return BaseResponse( status=json_data["status"], content=json_data, credits_used=json_data.get("credits"), credits_remaining=json_data.get("remainingCredits"), tips="You can download the result if status is success", ) except Exception as e: return BaseResponse( status="error", content=str(e), ) - pdfco/mcp/models.py:5-11 (schema)The BaseResponse model used as the return type for wait_job_completion. Contains status, content, credits_used, credits_remaining, and optional tips.
class BaseResponse(BaseModel): status: str content: Any credits_used: int | None = None credits_remaining: int | None = None tips: str | None = None - pdfco/mcp/__init__.py:18-43 (registration)Registration entry point: imports the job module (which triggers @mcp.tool() decorators) and runs the MCP server. The @mcp.tool() decorator on wait_job_completion registers it as a tool.
def main(): if len(sys.argv) > 1: transport = sys.argv[1] if transport == "stdio": mcp.run(transport=transport) elif transport == "sse": if len(sys.argv) < 2: raise ValueError("SSE transport requires a port number") port = int(sys.argv[2]) mcp.run(transport=transport, host="0.0.0.0", port=port) elif transport == "streamable-http": if len(sys.argv) < 3: raise ValueError( "Streamable HTTP transport requires a port number and path" ) port = int(sys.argv[2]) path = sys.argv[3] mcp.run(transport=transport, host="0.0.0.0", port=port, path=path) else: raise ValueError(f"Invalid transport: {transport}") else: mcp.run(transport="stdio") if __name__ == "__main__": main()