peek
Observe running child agent processes for a short time, returning natural-language message events and optional normalized tool call events.
Instructions
One-shot short observation window for running child agents. Returns only natural-language message events, and optionally normalized tool_call events, observed during this call; not a history API, not gapless streaming, and not stdout/stderr tailing. In v1, message extraction is supported for Codex, Claude, OpenCode, Gemini, and best-effort Forge Summary/Completed successfully lines. Forge tool calls are low-precision Execute/Finished markers and never include command output. Tool calls exclude raw tool output.
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| pids | Yes | Process IDs returned by run. Duplicates are deduplicated server-side, preserving first occurrence order. Unknown PIDs are returned per process as not_found. | |
| peek_time_sec | No | Optional positive integer observation window in seconds. Defaults to 10; maximum is 60. | |
| include_tool_calls | No | Optional: include normalized tool_call events without raw tool output. Defaults to false. |
Implementation Reference
- src/app/mcp.ts:427-454 (handler)MCP tool handler for 'peek' - validates input params (pids, peek_time_sec, include_tool_calls) and delegates to processService.peekProcesses()
private async handlePeek(toolArguments: any): Promise<ServerResult> { let pids: number[]; let peekTimeSec: number; let includeToolCalls: boolean; try { pids = validatePeekPids(toolArguments.pids); peekTimeSec = validatePeekTimeSec(toolArguments.peek_time_sec); if (toolArguments.include_tool_calls !== undefined && typeof toolArguments.include_tool_calls !== 'boolean') { throw new Error('include_tool_calls must be a boolean when provided'); } includeToolCalls = toolArguments.include_tool_calls === true; } catch (error: any) { throw new McpError(ErrorCode.InvalidParams, error.message); } try { const response = await this.processService.peekProcesses(pids, peekTimeSec, includeToolCalls); return { content: [{ type: 'text', text: JSON.stringify(response, null, 2) }] }; } catch (error: any) { throw new McpError(ErrorCode.InternalError, `Failed to peek processes: ${error.message}`); } } - src/process-service.ts:229-309 (handler)Core business logic for peek - attaches stdout/stderr listeners, observes processes for peekTimeSec (or until they terminate), extracts events via PeekEventExtractor, and returns the results
async peekProcesses(pids: number[], peekTimeSec = 10, includeToolCalls = false): Promise<PeekResponse> { const targetPids = validatePeekPids(pids); const targetPeekTimeSec = validatePeekTimeSec(peekTimeSec); const processes: PeekProcessResult[] = []; const observers: Array<{ entry: TrackedProcess; result: PeekProcessResult; stdoutExtractor: PeekEventExtractor; stderrExtractor: PeekEventExtractor; onStdout: (data: Buffer | string) => void; onStderr: (data: Buffer | string) => void; }> = []; for (const pid of targetPids) { const entry = this.processManager.get(pid); if (!entry) { processes.push(buildNotFoundPeekProcess(pid)); continue; } const result: PeekProcessResult = { pid, agent: entry.toolType, status: entry.status, events: [], truncated: false, error: null, }; processes.push(result); const stdoutExtractor = new PeekEventExtractor(entry.toolType, { includeToolCalls, source: 'stdout' }); const stderrExtractor = new PeekEventExtractor(entry.toolType, { includeToolCalls, source: 'stderr' }); const onStdout = (data: Buffer | string) => { appendPeekEvents(result, stdoutExtractor.push(data.toString(), new Date().toISOString())); }; const onStderr = (data: Buffer | string) => { appendPeekEvents(result, stderrExtractor.push(data.toString(), new Date().toISOString())); }; if (entry.status === 'running') { entry.process.stdout?.on('data', onStdout); entry.process.stderr?.on('data', onStderr); } observers.push({ entry, result, stdoutExtractor, stderrExtractor, onStdout, onStderr }); } const startedAt = new Date(); const startedAtMs = Date.now(); const runningObservers = observers.filter((observer) => observer.entry.status === 'running'); const terminalPromise = Promise.all(runningObservers.map((observer) => this.waitForProcessTerminal(observer.entry))); let timeoutHandle: ReturnType<typeof setTimeout> | undefined; const timeoutPromise = new Promise<void>((resolve) => { timeoutHandle = setTimeout(resolve, targetPeekTimeSec * 1000); timeoutHandle.unref?.(); }); try { await Promise.race([terminalPromise, timeoutPromise]); } finally { if (timeoutHandle) { clearTimeout(timeoutHandle); } const flushTs = new Date().toISOString(); for (const observer of observers) { observer.entry.process.stdout?.off('data', observer.onStdout); observer.entry.process.stderr?.off('data', observer.onStderr); const terminal = observer.entry.status !== 'running'; appendPeekEvents(observer.result, observer.stdoutExtractor.flush(flushTs, { terminal })); appendPeekEvents(observer.result, observer.stderrExtractor.flush(flushTs, { terminal })); observer.result.status = observer.entry.status; } } return { peek_started_at: startedAt.toISOString(), observed_duration_sec: observedDurationSec(startedAtMs), processes, }; } - src/parsers.ts:356-574 (helper)PeekEventExtractor class - parses stdout/stderr streams into PeekEvent[] (message and tool_call events), with agent-specific extraction logic for Claude, Codex, Gemini, Forge, and OpenCode
export class PeekEventExtractor { private pending = ''; private geminiAssistantBuffer = ''; private readonly includeToolCalls: boolean; private readonly source: 'stdout' | 'stderr'; private readonly toolMemory = new Map<string, ToolCallMemory>(); private forgePendingTool: PendingForgeTool | null = null; private forgeToolSequence = 0; constructor(private readonly agent: PeekAgent, options: PeekEventExtractorOptions = {}) { this.includeToolCalls = options.includeToolCalls === true; this.source = options.source || 'stdout'; } push(chunk: string, observedAt = new Date().toISOString()): PeekEvent[] { if (this.agent === 'forge' && this.source === 'stderr') { return []; } if (!chunk) { return []; } const lines = `${this.pending}${chunk}`.split(/\r?\n/); this.pending = lines.pop() || ''; return this.extractLines(lines, observedAt); } flush(observedAt = new Date().toISOString(), options: PeekFlushOptions = {}): PeekEvent[] { if (this.agent === 'forge' && this.source === 'stderr') { this.pending = ''; return []; } const events: PeekEvent[] = []; if (this.pending) { if (this.agent !== 'forge' || options.terminal === true) { const line = this.pending; this.pending = ''; events.push(...this.extractLines([line], observedAt)); } } events.push(...this.flushGeminiAssistantBuffer(observedAt)); events.push(...this.flushForgePendingTool(observedAt, options.terminal === true)); return events; } private extractLines(lines: string[], observedAt: string): PeekEvent[] { if (this.agent === 'forge') { return this.extractForgeLines(lines, observedAt); } const events: PeekEvent[] = []; for (const line of lines) { if (!line.trim()) { continue; } try { events.push(...this.extractParsedEvent(JSON.parse(line), observedAt)); } catch { debugLog(`[Debug] Skipping invalid peek JSON line: ${line}`); events.push(...this.flushGeminiAssistantBuffer(observedAt)); } } return events; } private extractForgeLines(lines: string[], observedAt: string): PeekEvent[] { const events: PeekEvent[] = []; for (const line of lines) { if (!line.trim()) { continue; } const summary = this.extractForgeMessage(line, 'Summary:'); if (summary !== null) { events.push({ kind: 'message', ts: observedAt, text: summary }); continue; } const completed = this.extractForgeMessage(line, 'Completed successfully:'); if (completed !== null) { events.push({ kind: 'message', ts: observedAt, text: completed }); continue; } if (this.includeToolCalls) { const executeMatch = line.match(FORGE_EXECUTE_PATTERN); if (executeMatch) { events.push(...this.completeForgePendingTool(observedAt)); const [, rawTool, rawSummary] = executeMatch; const tool = rawTool.trim() && !/\s/.test(rawTool.trim()) ? rawTool.trim() : 'shell'; const event = createToolCallEvent({ ts: observedAt, phase: 'started', id: `forge_${this.forgeToolSequence++}`, tool, command: rawSummary, }); this.forgePendingTool = { id: event.id!, tool: event.tool, summary: event.summary, summary_truncated: event.summary_truncated, }; events.push(event); continue; } if (FORGE_FINISHED_PATTERN.test(line)) { events.push(...this.completeForgePendingTool(observedAt)); } } } return events; } private extractForgeMessage(line: string, prefix: string): string | null { if (!line.startsWith(prefix)) { return null; } const text = line.slice(prefix.length).trim(); return text || null; } private extractParsedEvent(parsed: any, observedAt: string): PeekEvent[] { if (this.agent === 'gemini') { const events = this.extractGeminiParsedEvent(parsed, observedAt); return events; } return extractPeekEventsFromParsedEvent(this.agent, parsed, observedAt, this.includeToolCalls, this.toolMemory); } private extractGeminiParsedEvent(parsed: any, observedAt: string): PeekEvent[] { if (isGeminiAssistantMessageEvent(parsed)) { this.geminiAssistantBuffer += parsed.content; return []; } const events = this.flushGeminiAssistantBuffer(observedAt); if (this.includeToolCalls && parsed.type === 'tool_use') { const event = createToolCallEvent({ ts: observedAt, phase: 'started', id: parsed.tool_id, tool: parsed.tool_name || parsed.name || 'tool_use', command: parsed.parameters?.command, }); rememberToolCall(event, this.toolMemory); events.push(event); } else if (this.includeToolCalls && parsed.type === 'tool_result') { events.push(createRememberedCompletion({ ts: observedAt, id: parsed.tool_id, memory: this.toolMemory, fallbackTool: parsed.tool_name || parsed.name || 'tool_result', status: parsed.status, defaultStatus: 'unknown', })); } return events; } private flushGeminiAssistantBuffer(observedAt: string): PeekEvent[] { if (this.agent !== 'gemini' || !this.geminiAssistantBuffer) { return []; } const text = this.geminiAssistantBuffer; this.geminiAssistantBuffer = ''; if (!text.trim()) { return []; } return [{ kind: 'message', ts: observedAt, text }]; } private completeForgePendingTool(observedAt: string): PeekEvent[] { if (!this.forgePendingTool) { return []; } const pending = this.forgePendingTool; this.forgePendingTool = null; const event = createToolCallEvent({ ts: observedAt, phase: 'completed', id: pending.id, tool: pending.tool, status: 'unknown', defaultStatus: 'unknown', }); event.summary = pending.summary; if (pending.summary_truncated) { event.summary_truncated = true; } return [event]; } private flushForgePendingTool(observedAt: string, terminal: boolean): PeekEvent[] { if (this.agent !== 'forge' || !terminal) { return []; } return this.completeForgePendingTool(observedAt); } } - src/app/mcp.ts:248-309 (registration)MCP tool registration for 'peek' with name, description, and inputSchema (pids required, peek_time_sec and include_tool_calls optional)
{ name: 'peek', description: 'One-shot short observation window for running child agents. Returns only natural-language message events, and optionally normalized tool_call events, observed during this call; not a history API, not gapless streaming, and not stdout/stderr tailing. In v1, message extraction is supported for Codex, Claude, OpenCode, Gemini, and best-effort Forge Summary/Completed successfully lines. Forge tool calls are low-precision Execute/Finished markers and never include command output. Tool calls exclude raw tool output.', inputSchema: { type: 'object', properties: { pids: { type: 'array', items: { type: 'number' }, description: 'Process IDs returned by run. Duplicates are deduplicated server-side, preserving first occurrence order. Unknown PIDs are returned per process as not_found.', }, peek_time_sec: { type: 'number', description: 'Optional positive integer observation window in seconds. Defaults to 10; maximum is 60.', }, include_tool_calls: { type: 'boolean', description: 'Optional: include normalized tool_call events without raw tool output. Defaults to false.', }, }, required: ['pids'], }, }, { name: 'kill_process', description: 'Terminate a running AI agent process by PID.', inputSchema: { type: 'object', properties: { pid: { type: 'number', description: 'The process ID to terminate.', }, }, required: ['pid'], }, }, { name: 'cleanup_processes', description: 'Remove all completed and failed processes from the process list to free up memory.', inputSchema: { type: 'object', properties: {}, }, }, { name: 'doctor', description: 'Check supported AI CLI binary availability and path resolution. Does not verify login state or terms acceptance.', inputSchema: { type: 'object', properties: {}, }, }, { name: 'models', description: 'List supported model names, model aliases, and dynamic backend discovery hints.', inputSchema: { type: 'object', properties: {}, }, } ], - src/peek.ts:1-95 (schema)Type definitions (PeekProcessResult, PeekResponse), constants (DEFAULT_PEEK_TIME_SEC, MAX_PEEK_TIME_SEC, MAX_PEEK_PIDS, PEEK_MESSAGE_CAP), and validation/helper functions for peek functionality
import type { PeekEvent, PeekMessage } from './parsers.js'; import type { AgentType, ProcessStatus } from './process-service.js'; export const DEFAULT_PEEK_TIME_SEC = 10; export const MAX_PEEK_TIME_SEC = 60; export const MAX_PEEK_PIDS = 32; export const PEEK_MESSAGE_CAP = 50; export type PeekStatus = ProcessStatus | 'not_found'; export type PeekAgent = AgentType | string | null; export interface PeekProcessResult { pid: number; agent: PeekAgent; status: PeekStatus; events: PeekEvent[]; truncated: boolean; error: string | null; } export interface PeekResponse { peek_started_at: string; observed_duration_sec: number; processes: PeekProcessResult[]; } export function validatePeekPids(value: unknown): number[] { if (!Array.isArray(value)) { throw new Error('Missing or invalid required parameter: pids (must be an array of positive safe integers)'); } const deduped: number[] = []; const seen = new Set<number>(); for (const pid of value) { if (typeof pid !== 'number' || !Number.isSafeInteger(pid) || pid <= 0) { throw new Error('All pids must be positive safe integers'); } if (!seen.has(pid)) { seen.add(pid); deduped.push(pid); } } if (deduped.length === 0 || deduped.length > MAX_PEEK_PIDS) { throw new Error(`pids must contain 1..${MAX_PEEK_PIDS} entries after dedupe`); } return deduped; } export function validatePeekTimeSec(value: unknown): number { if (value === undefined || value === null) { return DEFAULT_PEEK_TIME_SEC; } if (typeof value !== 'number' || !Number.isSafeInteger(value) || value <= 0 || value > MAX_PEEK_TIME_SEC) { throw new Error(`peek_time_sec must be a positive integer no greater than ${MAX_PEEK_TIME_SEC}`); } return value; } export function buildNotFoundPeekProcess(pid: number): PeekProcessResult { return { pid, agent: null, status: 'not_found', events: [], truncated: false, error: 'process not found', }; } export function appendPeekEvents(target: PeekProcessResult, events: PeekEvent[]): void { for (const event of events) { if (target.events.length < PEEK_MESSAGE_CAP) { target.events.push(event); } else { target.truncated = true; } } } export function appendPeekMessages(target: PeekProcessResult, messages: PeekMessage[]): void { appendPeekEvents( target, messages.map((message) => ({ kind: 'message' as const, ...message })), ); } export function observedDurationSec(startedAtMs: number, endedAtMs = Date.now()): number { return Number(((endedAtMs - startedAtMs) / 1000).toFixed(2)); }