get_job_details
Retrieve detailed information about a scheduled notebook job, including job ID, name, description, timestamps, execution counts, config settings, and metadata by specifying the job's UUID.
Instructions
Retrieve comprehensive information about a scheduled notebook job.
Returns:
- jobID: Unique identifier (UUID format)
- name: Display name of the job
- description: Human-readable job description
- createdAt: Creation timestamp (ISO 8601)
- terminatedAt: End timestamp if completed
- completedExecutionsCount: Number of successful runs
- enqueuedBy: User ID who created the job
- executionConfig: Notebook path and runtime settings
- schedule: Mode, interval, and start time
- targetConfig: Database and workspace settings
- jobMetadata: Execution statistics and status
Args:
job_id: UUID of the scheduled job to retrieve details for
Returns:
Dictionary with job details
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| ctx | No | ||
| job_id | Yes |
Implementation Reference
- src/api/tools/jobs/jobs.py:94-157 (handler)Handler function for retrieving job details by job_id. This is the core implementation of the job details tool logic.async def get_job( ctx: Context, job_id: str, ) -> dict: """ Retrieve details of a scheduled job by its ID. Args: ctx: Context object job_id: ID of the job to retrieve Returns: Dict with job details or error info """ settings = config.get_settings() start_time = time.time() user_id = config.get_user_id() try: jobs_manager = utils.get_org_jobs_manager() job_obj = jobs_manager.get(job_id) if not job_obj: return { "status": "error", "message": f"Job with ID '{job_id}' not found.", "errorCode": "JOB_NOT_FOUND", } settings.analytics_manager.track_event( user_id, "tool_calling", { "name": "get_job", "job_id": job_id, }, ) execution_time = (time.time() - start_time) * 1000 return { "status": "success", "message": f"Job '{job_obj.name}' retrieved successfully.", "data": { "jobID": job_obj.job_id, "name": job_obj.name, "description": job_obj.description, "completedExecutionsCount": job_obj.completed_executions_count, "schedule": { "mode": job_obj.schedule.mode.value, "executionIntervalInMinutes": job_obj.schedule.execution_interval_in_minutes, }, "createdAt": job_obj.created_at, "terminatedAt": job_obj.terminated_at, }, "metadata": { "executionTimeMs": round(execution_time, 2), "timestamp": datetime.now(timezone.utc).isoformat(), }, } except Exception as e: logger.error(f"Error retrieving job: {str(e)}") return { "status": "error", "message": f"Failed to retrieve job: {str(e)}", "errorCode": "JOB_GET_FAILED", "errorDetails": {"exception_type": type(e).__name__}, }
- src/api/tools/tools.py:23-53 (registration)Imports the get_job handler and registers it in the central tools list, which is used for MCP tool registration. The tool name will be 'get_job' based on function name.from src.api.tools.jobs import create_job_from_notebook, delete_job, get_job from src.api.tools.organization import ( organization_info, choose_organization, set_organization, ) # Define the tools with their metadata tools_definition = [ {"func": get_user_info}, {"func": organization_info}, {"func": choose_organization}, {"func": set_organization}, {"func": workspace_groups_info}, {"func": workspaces_info}, {"func": resume_workspace}, {"func": list_starter_workspaces}, {"func": create_starter_workspace}, {"func": terminate_starter_workspace}, {"func": list_regions}, {"func": list_sharedtier_regions}, {"func": run_sql}, {"func": create_notebook_file}, {"func": upload_notebook_file}, {"func": create_job_from_notebook}, {"func": get_job}, {"func": delete_job}, ] # Export the tools tools = [Tool.create_from_dict(tool) for tool in tools_definition]
- src/api/tools/jobs/utils.py:4-9 (helper)Helper function to get the organization's jobs manager, used in the get_job handler.def get_org_jobs_manager(): org = org_utils.fetch_organization() if not org: raise ValueError("Organization not found. Please ensure you are logged in.") return org.jobs
- src/api/tools/registery.py:26-67 (registration)Registers all tools from the tools list to the MCP server, using the function name as tool name (thus 'get_job').def register_tools(mcp: FastMCP, **filter_flags) -> None: """ Register tools with the MCP server with optional filtering. Args: mcp: FastMCP server instance **filter_flags: Optional flag filters to apply Examples: # Register all public tools (default) register_tools(mcp) # Register only public tools explicitly register_tools(mcp, private=False, deprecated=False) """ # Import here to avoid circular imports from src.config.config import get_settings, LocalSettings # Default: only register public tools (non-private, non-deprecated) if not filter_flags: filter_flags = {"internal": False, "deprecated": False} filtered_tools: List[Tool] = filter_tools(**filter_flags) # Check if we're using API key authentication in local mode settings = get_settings() using_api_key = ( not settings.is_remote and isinstance(settings, LocalSettings) and (settings.api_key or settings.jwt_token and settings.org_id) ) # List of tools to exclude when using API key authentication api_key_excluded_tools = ["choose_organization", "set_organization"] for tool in filtered_tools: func = tool.func # Skip organization-related tools when using API key authentication if using_api_key and func.__name__ in api_key_excluded_tools: continue mcp.tool(name=func.__name__, description=func.__doc__)(func)