Skip to main content
Glama

get_job_details

Retrieve detailed information about a scheduled notebook job, including job ID, name, description, timestamps, execution counts, config settings, and metadata by specifying the job's UUID.

Instructions

Retrieve comprehensive information about a scheduled notebook job. Returns: - jobID: Unique identifier (UUID format) - name: Display name of the job - description: Human-readable job description - createdAt: Creation timestamp (ISO 8601) - terminatedAt: End timestamp if completed - completedExecutionsCount: Number of successful runs - enqueuedBy: User ID who created the job - executionConfig: Notebook path and runtime settings - schedule: Mode, interval, and start time - targetConfig: Database and workspace settings - jobMetadata: Execution statistics and status Args: job_id: UUID of the scheduled job to retrieve details for Returns: Dictionary with job details

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
ctxNo
job_idYes

Implementation Reference

  • Handler function for retrieving job details by job_id. This is the core implementation of the job details tool logic.
    async def get_job( ctx: Context, job_id: str, ) -> dict: """ Retrieve details of a scheduled job by its ID. Args: ctx: Context object job_id: ID of the job to retrieve Returns: Dict with job details or error info """ settings = config.get_settings() start_time = time.time() user_id = config.get_user_id() try: jobs_manager = utils.get_org_jobs_manager() job_obj = jobs_manager.get(job_id) if not job_obj: return { "status": "error", "message": f"Job with ID '{job_id}' not found.", "errorCode": "JOB_NOT_FOUND", } settings.analytics_manager.track_event( user_id, "tool_calling", { "name": "get_job", "job_id": job_id, }, ) execution_time = (time.time() - start_time) * 1000 return { "status": "success", "message": f"Job '{job_obj.name}' retrieved successfully.", "data": { "jobID": job_obj.job_id, "name": job_obj.name, "description": job_obj.description, "completedExecutionsCount": job_obj.completed_executions_count, "schedule": { "mode": job_obj.schedule.mode.value, "executionIntervalInMinutes": job_obj.schedule.execution_interval_in_minutes, }, "createdAt": job_obj.created_at, "terminatedAt": job_obj.terminated_at, }, "metadata": { "executionTimeMs": round(execution_time, 2), "timestamp": datetime.now(timezone.utc).isoformat(), }, } except Exception as e: logger.error(f"Error retrieving job: {str(e)}") return { "status": "error", "message": f"Failed to retrieve job: {str(e)}", "errorCode": "JOB_GET_FAILED", "errorDetails": {"exception_type": type(e).__name__}, }
  • Imports the get_job handler and registers it in the central tools list, which is used for MCP tool registration. The tool name will be 'get_job' based on function name.
    from src.api.tools.jobs import create_job_from_notebook, delete_job, get_job from src.api.tools.organization import ( organization_info, choose_organization, set_organization, ) # Define the tools with their metadata tools_definition = [ {"func": get_user_info}, {"func": organization_info}, {"func": choose_organization}, {"func": set_organization}, {"func": workspace_groups_info}, {"func": workspaces_info}, {"func": resume_workspace}, {"func": list_starter_workspaces}, {"func": create_starter_workspace}, {"func": terminate_starter_workspace}, {"func": list_regions}, {"func": list_sharedtier_regions}, {"func": run_sql}, {"func": create_notebook_file}, {"func": upload_notebook_file}, {"func": create_job_from_notebook}, {"func": get_job}, {"func": delete_job}, ] # Export the tools tools = [Tool.create_from_dict(tool) for tool in tools_definition]
  • Helper function to get the organization's jobs manager, used in the get_job handler.
    def get_org_jobs_manager(): org = org_utils.fetch_organization() if not org: raise ValueError("Organization not found. Please ensure you are logged in.") return org.jobs
  • Registers all tools from the tools list to the MCP server, using the function name as tool name (thus 'get_job').
    def register_tools(mcp: FastMCP, **filter_flags) -> None: """ Register tools with the MCP server with optional filtering. Args: mcp: FastMCP server instance **filter_flags: Optional flag filters to apply Examples: # Register all public tools (default) register_tools(mcp) # Register only public tools explicitly register_tools(mcp, private=False, deprecated=False) """ # Import here to avoid circular imports from src.config.config import get_settings, LocalSettings # Default: only register public tools (non-private, non-deprecated) if not filter_flags: filter_flags = {"internal": False, "deprecated": False} filtered_tools: List[Tool] = filter_tools(**filter_flags) # Check if we're using API key authentication in local mode settings = get_settings() using_api_key = ( not settings.is_remote and isinstance(settings, LocalSettings) and (settings.api_key or settings.jwt_token and settings.org_id) ) # List of tools to exclude when using API key authentication api_key_excluded_tools = ["choose_organization", "set_organization"] for tool in filtered_tools: func = tool.func # Skip organization-related tools when using API key authentication if using_api_key and func.__name__ in api_key_excluded_tools: continue mcp.tool(name=func.__name__, description=func.__doc__)(func)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/singlestore-labs/mcp-server-singlestore'

If you have feedback or need assistance with the MCP directory API, please join our Discord server