send
Send messages to the most recent OpenCode session on an instance, receiving real‑time streamed responses. Optionally abort a running task instead of sending a message.
Instructions
Send a message to the most recent opencode session on an instance. Streams the response back in real-time. Set abort=true to stop a running task instead of sending a message.
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| message | No | The message to send (not required when aborting) | |
| instance | Yes | Instance name (exact or fuzzy substring match) | |
| abort | No | Set to true to abort the currently running task |
Implementation Reference
- src/tools/simplified.ts:172-352 (handler)The 'send' tool handler: registers an MCP tool named 'send' with schema (message, instance, abort) and handler logic that handles abort mode, message sending, SSE streaming, and response collection.
server.tool( 'send', 'Send a message to the most recent opencode session on an instance. Streams the response back in real-time. Set abort=true to stop a running task instead of sending a message.', { message: z .string() .optional() .describe('The message to send (not required when aborting)'), instance: z .string() .describe('Instance name (exact or fuzzy substring match)'), abort: z .boolean() .optional() .default(false) .describe('Set to true to abort the currently running task'), }, async ({ message, instance: query, abort }, extra) => { try { const { instance } = registry.resolveInstance(query) const baseUrl = instance.url // Abort mode if (abort) { const { busySessionId } = await getInstanceStatus(baseUrl) if (!busySessionId) { return { content: [ { type: 'text', text: `No active task on ${instance.name} — nothing to abort.`, }, ], } } await fetch(`${baseUrl}/session/${busySessionId}/abort`, { method: 'POST', }) return { content: [ { type: 'text', text: `Aborted active task on ${instance.name}.`, }, ], } } // Send mode — message is required if (!message) { return { content: [ { type: 'text', text: 'Error: message is required when not aborting.', }, ], isError: true, } } // Check if instance is busy const { busySessionId } = await getInstanceStatus(baseUrl) if (busySessionId) { return { content: [ { type: 'text', text: `${instance.name} is currently busy processing a task. ` + `You can wait for it to finish, or call send with abort=true to stop it.`, }, ], } } // Find the most recently updated session const session = await findMostRecentSession(baseUrl) if (!session) { return { content: [ { type: 'text', text: `No sessions found on ${instance.name}. Open opencode and start a session first.`, }, ], isError: true, } } // Submit via session API await fetch(`${baseUrl}/session/${session.id}/prompt_async`, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ parts: [{ type: 'text', text: message }], }), }) // Subscribe to SSE and stream deltas back const timeout = Number(process.env.SEND_TIMEOUT_MS) || 300_000 const controller = new AbortController() const timer = setTimeout(() => controller.abort(), timeout) let fullResponse = '' try { for await (const event of sseEvents( baseUrl, controller.signal, )) { if ( event.type === 'message.part.delta' && event.properties.field === 'text' && event.properties.sessionID === session.id ) { const delta = event.properties.delta as string fullResponse += delta // Stream delta back to MCP client via progress notification if (extra._meta?.progressToken !== undefined) { await extra.sendNotification({ method: 'notifications/progress', params: { progressToken: extra._meta.progressToken, progress: fullResponse.length, total: 0, message: delta, }, }) } } if ( event.type === 'session.idle' && event.properties.sessionID === session.id ) { break } } } catch (err) { if ((err as Error).name === 'AbortError') { fullResponse += '\n\n(still processing — timed out waiting for response)' } else { throw err } } finally { clearTimeout(timer) controller.abort() } if (!fullResponse) { return { content: [ { type: 'text', text: `Message sent to ${instance.name}, but no response received. Check the instance directly.`, }, ], } } return { content: [ { type: 'text', text: fullResponse, }, ], } } catch (err) { return { content: [ { type: 'text', text: `Error: ${(err as Error).message}` }, ], isError: true, } } }, ) - src/tools/simplified.ts:175-188 (schema)Input schema for the 'send' tool: optional message string, required instance string, optional abort boolean (default false).
{ message: z .string() .optional() .describe('The message to send (not required when aborting)'), instance: z .string() .describe('Instance name (exact or fuzzy substring match)'), abort: z .boolean() .optional() .default(false) .describe('Set to true to abort the currently running task'), }, - src/index.ts:38-38 (registration)Registration entry point: calls registerSimplifiedTools(server, registry) which registers the 'send' tool alongside other tools.
registerSimplifiedTools(server, registry) - src/tools/simplified.ts:129-132 (registration)Function signature that registers all simplified tools including 'send' via server.tool().
export function registerSimplifiedTools( server: McpServer, registry: InstanceRegistry, ): void { - src/tools/simplified.ts:6-50 (helper)sseEvents helper: async generator subscribing to SSE event stream, used by the 'send' handler to stream response deltas back in real-time.
/** * Subscribe to an opencode instance's SSE event stream. * Returns an async iterator of parsed events. */ async function* sseEvents( baseUrl: string, signal: AbortSignal, ): AsyncGenerator<{ type: string; properties: Record<string, unknown> }> { const res = await fetch(`${baseUrl}/event`, { headers: { Accept: 'text/event-stream' }, signal, }) if (!res.ok || !res.body) return const reader = res.body.getReader() const decoder = new TextDecoder() let buffer = '' try { while (true) { const { done, value } = await reader.read() if (done) break buffer += decoder.decode(value, { stream: true }) const lines = buffer.split('\n') buffer = lines.pop() ?? '' for (const line of lines) { if (line.startsWith('data: ')) { const data = line.slice(6).trim() if (data) { try { yield JSON.parse(data) } catch { // Skip malformed events } } } } } } finally { reader.releaseLock() } }