get_execution_output
Retrieve log output from Rundeck job executions to monitor progress, debug issues, and track completion status. Supports filtering by node and tailing running executions.
Instructions
Get the log output from a job execution.
Retrieves log entries from the execution. For running executions, use the
'offset' parameter to poll for new output. The 'completed' field indicates
whether the execution has finished.
Args:
execution_id: The execution ID (integer)
last_lines: Return only the last N lines (overrides offset)
max_lines: Maximum number of lines to return from offset
offset: Byte offset to start reading from (for tailing)
node: Filter output to a specific node
Returns:
ExecutionOutput with log entries and metadata
Examples:
Get all output:
>>> output = get_execution_output(12345)
>>> for entry in output.entries:
... print(f"[{entry.level}] {entry.log}")
Get last 50 lines:
>>> output = get_execution_output(12345, last_lines=50)
Tail running execution:
>>> output = get_execution_output(12345, offset=0)
>>> while not output.completed:
... output = get_execution_output(12345, offset=output.offset)
... # process new entries
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| execution_id | Yes | ||
| last_lines | No | ||
| max_lines | No | ||
| offset | No | ||
| node | No |
Implementation Reference
- rundeck_mcp/tools/executions.py:86-142 (handler)The primary handler function implementing the get_execution_output tool. Fetches log output from Rundeck API using the provided parameters and parses it into an ExecutionOutput model.def get_execution_output( execution_id: int, last_lines: int | None = None, max_lines: int | None = None, offset: int | None = None, node: str | None = None, ) -> ExecutionOutput: """Get the log output from a job execution. Retrieves log entries from the execution. For running executions, use the 'offset' parameter to poll for new output. The 'completed' field indicates whether the execution has finished. Args: execution_id: The execution ID (integer) last_lines: Return only the last N lines (overrides offset) max_lines: Maximum number of lines to return from offset offset: Byte offset to start reading from (for tailing) node: Filter output to a specific node Returns: ExecutionOutput with log entries and metadata Examples: Get all output: >>> output = get_execution_output(12345) >>> for entry in output.entries: ... print(f"[{entry.level}] {entry.log}") Get last 50 lines: >>> output = get_execution_output(12345, last_lines=50) Tail running execution: >>> output = get_execution_output(12345, offset=0) >>> while not output.completed: ... output = get_execution_output(12345, offset=output.offset) ... # process new entries """ client = get_client() # Build path with optional node filter path = f"/execution/{execution_id}/output" if node: path = f"/execution/{execution_id}/output/node/{node}" # Build parameters params: dict[str, Any] = {} if last_lines is not None: params["lastlines"] = last_lines if max_lines is not None: params["maxlines"] = max_lines if offset is not None: params["offset"] = offset response = client.get(path, params=params) return _parse_execution_output(execution_id, response)
- Pydantic model defining the structure and validation for the tool's output, including log entries and execution metadata.class ExecutionOutput(BaseModel): """Output/logs from a job execution. Contains the log entries and metadata about the output retrieval. Use the 'completed' field to determine if the execution has finished. """ id: int = Field(description="The execution ID") offset: int = Field(default=0, description="Byte offset in the log file") completed: bool = Field(description="Whether the execution has completed") exec_completed: bool = Field( default=False, alias="execCompleted", description="Whether execution is complete", ) has_more_output: bool = Field( default=False, alias="hasMoreOutput", description="Whether more output is available", ) exec_state: str | None = Field( default=None, alias="execState", description="Current execution state", ) exec_duration: int | None = Field( default=None, alias="execDuration", description="Execution duration in milliseconds", ) percent_loaded: float | None = Field( default=None, alias="percentLoaded", description="Percentage of output loaded (0-100)", ) total_size: int | None = Field( default=None, alias="totalSize", description="Total size of log file in bytes", ) entries: list[LogEntry] = Field(default_factory=list, description="Log entries") @computed_field @property def output_summary(self) -> str: """Generate a summary of the output.""" status = "COMPLETE" if self.completed else "IN PROGRESS" lines = [f"Execution Output (ID: {self.id}) - {status}"] if self.exec_duration: seconds = self.exec_duration / 1000 lines.append(f"Duration: {seconds:.1f}s") if self.percent_loaded is not None: lines.append(f"Loaded: {self.percent_loaded:.1f}%") lines.append(f"Log entries: {len(self.entries)}") if self.has_more_output: lines.append("NOTE: More output available (use offset parameter)") return "\n".join(lines)
- rundeck_mcp/server.py:108-110 (registration)MCP server registration loop that adds get_execution_output (via the read_tools list) as a read-only tool with appropriate annotations.for tool in read_tools: add_read_only_tool(mcp, tool)
- Helper function that parses the raw API response into the structured ExecutionOutput model.def _parse_execution_output(execution_id: int, data: dict[str, Any]) -> ExecutionOutput: """Parse execution output data from API response. Args: execution_id: The execution ID data: Raw API response data Returns: Parsed ExecutionOutput model """ # Parse log entries entries_data = data.get("entries", []) entries = [_parse_log_entry(entry) for entry in entries_data] return ExecutionOutput( id=execution_id, offset=data.get("offset", 0), completed=data.get("completed", False), exec_completed=data.get("execCompleted", False), has_more_output=data.get("hasMoreOutput", False), exec_state=data.get("execState"), exec_duration=data.get("execDuration"), percent_loaded=data.get("percentLoaded"), total_size=data.get("totalSize"), entries=entries, )
- Pydantic model for individual log entries contained in the ExecutionOutput.class LogEntry(BaseModel): """A single log entry from an execution.""" time: str | None = Field(default=None, description="Timestamp of the log entry") absolute_time: str | None = Field( default=None, alias="absolute_time", description="Absolute timestamp", ) level: str = Field(default="NORMAL", description="Log level (e.g., NORMAL, ERROR, WARN, DEBUG)") log: str = Field(description="The log message content") node: str | None = Field(default=None, description="Node that produced this log entry") step: str | None = Field(default=None, alias="stepctx", description="Step context identifier") user: str | None = Field(default=None, description="User associated with this log entry")