Skip to main content
Glama

tail_job_output

Retrieve the last N lines of stdout and stderr for a specific job on the MCP Background Job Server, enabling real-time monitoring and debugging of long-running processes.

Instructions

Get the last N lines of stdout and stderr from a job.

Args: job_id: The UUID of the job to tail lines: Number of lines to return (1-1000, default 50)

Returns: ProcessOutput containing the last N lines of stdout and stderr

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
job_idYesJob ID to tail
linesNoNumber of lines to return

Implementation Reference

  • MCP tool registration (@mcp.tool()) and handler function that validates input parameters using Pydantic Fields (schema), retrieves the JobManager, calls its tail_job_output method, and handles exceptions with ToolError.
    @mcp.tool() async def tail_job_output( job_id: str = Field(..., description="Job ID to tail"), lines: int = Field(50, description="Number of lines to return", ge=1, le=1000), ) -> ProcessOutput: """Get the last N lines of stdout and stderr from a job. Args: job_id: The UUID of the job to tail lines: Number of lines to return (1-1000, default 50) Returns: ProcessOutput containing the last N lines of stdout and stderr """ try: job_manager = get_job_manager() job_output = await job_manager.tail_job_output(job_id, lines) return job_output except KeyError: raise ToolError(f"Job {job_id} not found") except ValueError as e: raise ToolError(f"Invalid parameter: {str(e)}") except Exception as e: logger.error(f"Error tailing job output for {job_id}: {e}") raise ToolError(f"Failed to tail job output: {str(e)}")
  • JobManager.tail_job_output helper method that checks job existence, validates lines parameter, retrieves the ProcessWrapper, and delegates to its tail_output method.
    async def tail_job_output(self, job_id: str, lines: int) -> ProcessOutput: """Get last N lines of output. Args: job_id: Job identifier lines: Number of lines to return Returns: ProcessOutput with last N lines of stdout and stderr Raises: KeyError: If job_id doesn't exist ValueError: If lines is not positive """ if job_id not in self._jobs: raise KeyError(f"Job {job_id} not found") if lines <= 0: raise ValueError("Number of lines must be positive") process_wrapper = self._processes.get(job_id) if process_wrapper is None: return ProcessOutput(stdout="", stderr="") return process_wrapper.tail_output(lines)
  • ProcessWrapper.tail_output core implementation that thread-safely extracts the last N lines from stdout and stderr deque ring buffers and joins them into ProcessOutput.
    def tail_output(self, lines: int) -> ProcessOutput: """Get last N lines of output. Args: lines: Number of lines to return from the end Returns: ProcessOutput containing last N lines of stdout and stderr """ with self._buffer_lock: # Get last N lines from each buffer stdout_lines = list(self.stdout_buffer)[-lines:] if lines > 0 else [] stderr_lines = list(self.stderr_buffer)[-lines:] if lines > 0 else [] return ProcessOutput( stdout="\n".join(stdout_lines), stderr="\n".join(stderr_lines) )
  • Pydantic BaseModel defining the output schema (ProcessOutput) returned by the tool, with stdout and stderr fields.
    class ProcessOutput(BaseModel): """Structured stdout/stderr output from a process.""" stdout: str = Field(..., description="Standard output content") stderr: str = Field(..., description="Standard error content")
  • The @mcp.tool() decorator registers 'tail_job_output' as an MCP tool with the FastMCP server instance.
    @mcp.tool() async def tail_job_output( job_id: str = Field(..., description="Job ID to tail"), lines: int = Field(50, description="Number of lines to return", ge=1, le=1000), ) -> ProcessOutput: """Get the last N lines of stdout and stderr from a job. Args: job_id: The UUID of the job to tail lines: Number of lines to return (1-1000, default 50) Returns: ProcessOutput containing the last N lines of stdout and stderr """ try: job_manager = get_job_manager() job_output = await job_manager.tail_job_output(job_id, lines) return job_output except KeyError: raise ToolError(f"Job {job_id} not found") except ValueError as e: raise ToolError(f"Invalid parameter: {str(e)}") except Exception as e: logger.error(f"Error tailing job output for {job_id}: {e}") raise ToolError(f"Failed to tail job output: {str(e)}")

Other Tools

Related Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/dylan-gluck/mcp-background-job'

If you have feedback or need assistance with the MCP directory API, please join our Discord server