import { t as agentContext } from "./context-BkKbAa1R.js";
import { t as MessageType } from "./ai-types-DEtF_8Km.js";
import "./client-DjTPRM8-.js";
import "./client-QZa2Rq0l.js";
import "./do-oauth-client-provider-B1fVIshX.js";
import { t as Agent } from "./src-BZDh910Z.js";
import { r as autoTransformMessages } from "./ai-chat-v5-migration-DguhuLKF.js";
import { jsonSchema, tool } from "ai";
import { nanoid } from "nanoid";
//#region src/ai-chat-agent.ts
/**
* Converts client tool schemas to AI SDK tool format.
*
* These tools have no `execute` function - when the AI model calls them,
* the tool call is sent back to the client for execution.
*
* @param clientTools - Array of tool schemas from the client
* @returns Record of AI SDK tools that can be spread into your tools object
*/
function createToolsFromClientSchemas(clientTools) {
if (!clientTools || clientTools.length === 0) return {};
const seenNames = /* @__PURE__ */ new Set();
for (const t of clientTools) {
if (seenNames.has(t.name)) console.warn(`[createToolsFromClientSchemas] Duplicate tool name "${t.name}" found. Later definitions will override earlier ones.`);
seenNames.add(t.name);
}
return Object.fromEntries(clientTools.map((t) => [t.name, tool({
description: t.description ?? "",
inputSchema: jsonSchema(t.parameters ?? { type: "object" })
})]));
}
/** Number of chunks to buffer before flushing to SQLite */
const CHUNK_BUFFER_SIZE = 10;
/** Maximum buffer size to prevent memory issues on rapid reconnections */
const CHUNK_BUFFER_MAX_SIZE = 100;
/** Maximum age for a "streaming" stream before considering it stale (ms) - 5 minutes */
const STREAM_STALE_THRESHOLD_MS = 300 * 1e3;
/** Default cleanup interval for old streams (ms) - every 10 minutes */
const CLEANUP_INTERVAL_MS = 600 * 1e3;
/** Default age threshold for cleaning up completed streams (ms) - 24 hours */
const CLEANUP_AGE_THRESHOLD_MS = 1440 * 60 * 1e3;
const decoder = new TextDecoder();
/**
* Extension of Agent with built-in chat capabilities
* @template Env Environment type containing bindings
*/
var AIChatAgent = class extends Agent {
constructor(ctx, env) {
super(ctx, env);
this._activeStreamId = null;
this._activeRequestId = null;
this._streamingMessage = null;
this._streamCompletionPromise = null;
this._streamCompletionResolve = null;
this._streamChunkIndex = 0;
this._chunkBuffer = [];
this._isFlushingChunks = false;
this._lastCleanupTime = 0;
this.sql`create table if not exists cf_ai_chat_agent_messages (
id text primary key,
message text not null,
created_at datetime default current_timestamp
)`;
this.sql`create table if not exists cf_ai_chat_stream_chunks (
id text primary key,
stream_id text not null,
body text not null,
chunk_index integer not null,
created_at integer not null
)`;
this.sql`create table if not exists cf_ai_chat_stream_metadata (
id text primary key,
request_id text not null,
status text not null,
created_at integer not null,
completed_at integer
)`;
this.sql`create index if not exists idx_stream_chunks_stream_id
on cf_ai_chat_stream_chunks(stream_id, chunk_index)`;
this.messages = autoTransformMessages(this._loadMessagesFromDb());
this._chatMessageAbortControllers = /* @__PURE__ */ new Map();
this._restoreActiveStream();
const _onConnect = this.onConnect.bind(this);
this.onConnect = async (connection, ctx$1) => {
if (this._activeStreamId) this._notifyStreamResuming(connection);
return _onConnect(connection, ctx$1);
};
const _onMessage = this.onMessage.bind(this);
this.onMessage = async (connection, message) => {
if (typeof message === "string") {
let data;
try {
data = JSON.parse(message);
} catch (_error) {
return _onMessage(connection, message);
}
if (data.type === MessageType.CF_AGENT_USE_CHAT_REQUEST && data.init.method === "POST") {
const { body } = data.init;
const { messages, clientTools } = JSON.parse(body);
const transformedMessages = autoTransformMessages(messages);
this._broadcastChatMessage({
messages: transformedMessages,
type: MessageType.CF_AGENT_CHAT_MESSAGES
}, [connection.id]);
await this.persistMessages(transformedMessages, [connection.id]);
this.observability?.emit({
displayMessage: "Chat message request",
id: data.id,
payload: {},
timestamp: Date.now(),
type: "message:request"
}, this.ctx);
const chatMessageId = data.id;
const abortSignal = this._getAbortSignal(chatMessageId);
return this._tryCatchChat(async () => {
return agentContext.run({
agent: this,
connection,
request: void 0,
email: void 0
}, async () => {
const response = await this.onChatMessage(async (_finishResult) => {
this._removeAbortController(chatMessageId);
this.observability?.emit({
displayMessage: "Chat message response",
id: data.id,
payload: {},
timestamp: Date.now(),
type: "message:response"
}, this.ctx);
}, {
abortSignal,
clientTools
});
if (response) await this._reply(data.id, response, [connection.id]);
else {
console.warn(`[AIChatAgent] onChatMessage returned no response for chatMessageId: ${chatMessageId}`);
this._broadcastChatMessage({
body: "No response was generated by the agent.",
done: true,
id: data.id,
type: MessageType.CF_AGENT_USE_CHAT_RESPONSE
}, [connection.id]);
}
});
});
}
if (data.type === MessageType.CF_AGENT_CHAT_CLEAR) {
this._destroyAbortControllers();
this.sql`delete from cf_ai_chat_agent_messages`;
this.sql`delete from cf_ai_chat_stream_chunks`;
this.sql`delete from cf_ai_chat_stream_metadata`;
this._activeStreamId = null;
this._activeRequestId = null;
this._streamChunkIndex = 0;
this.messages = [];
this._broadcastChatMessage({ type: MessageType.CF_AGENT_CHAT_CLEAR }, [connection.id]);
return;
}
if (data.type === MessageType.CF_AGENT_CHAT_MESSAGES) {
const transformedMessages = autoTransformMessages(data.messages);
await this.persistMessages(transformedMessages, [connection.id]);
return;
}
if (data.type === MessageType.CF_AGENT_CHAT_REQUEST_CANCEL) {
this._cancelChatRequest(data.id);
return;
}
if (data.type === MessageType.CF_AGENT_STREAM_RESUME_ACK) {
if (this._activeStreamId && this._activeRequestId && this._activeRequestId === data.id) this._sendStreamChunks(connection, this._activeStreamId, this._activeRequestId);
return;
}
if (data.type === MessageType.CF_AGENT_TOOL_RESULT) {
const { toolCallId, toolName, output, autoContinue } = data;
this._applyToolResult(toolCallId, toolName, output).then((applied) => {
if (applied && autoContinue) {
const waitForStream = async () => {
if (this._streamCompletionPromise) await this._streamCompletionPromise;
else await new Promise((resolve) => setTimeout(resolve, 500));
};
waitForStream().then(() => {
const continuationId = nanoid();
const abortSignal = this._getAbortSignal(continuationId);
this._tryCatchChat(async () => {
return agentContext.run({
agent: this,
connection,
request: void 0,
email: void 0
}, async () => {
const response = await this.onChatMessage(async (_finishResult) => {
this._removeAbortController(continuationId);
this.observability?.emit({
displayMessage: "Chat message response (tool continuation)",
id: continuationId,
payload: {},
timestamp: Date.now(),
type: "message:response"
}, this.ctx);
}, { abortSignal });
if (response) await this._reply(continuationId, response, [], { continuation: true });
});
});
});
}
});
return;
}
}
return _onMessage(connection, message);
};
}
/**
* Restore active stream state if the agent was restarted during streaming.
* Called during construction to recover any interrupted streams.
* Validates stream freshness to avoid sending stale resume notifications.
* @internal Protected for testing purposes.
*/
_restoreActiveStream() {
const activeStreams = this.sql`
select * from cf_ai_chat_stream_metadata
where status = 'streaming'
order by created_at desc
limit 1
`;
if (activeStreams && activeStreams.length > 0) {
const stream = activeStreams[0];
const streamAge = Date.now() - stream.created_at;
if (streamAge > STREAM_STALE_THRESHOLD_MS) {
this.sql`delete from cf_ai_chat_stream_chunks where stream_id = ${stream.id}`;
this.sql`delete from cf_ai_chat_stream_metadata where id = ${stream.id}`;
console.warn(`[AIChatAgent] Deleted stale stream ${stream.id} (age: ${Math.round(streamAge / 1e3)}s)`);
return;
}
this._activeStreamId = stream.id;
this._activeRequestId = stream.request_id;
const lastChunk = this.sql`
select max(chunk_index) as max_index
from cf_ai_chat_stream_chunks
where stream_id = ${this._activeStreamId}
`;
this._streamChunkIndex = lastChunk && lastChunk[0]?.max_index != null ? lastChunk[0].max_index + 1 : 0;
}
}
/**
* Notify a connection about an active stream that can be resumed.
* The client should respond with CF_AGENT_STREAM_RESUME_ACK to receive chunks.
* Uses in-memory state for request ID - no extra DB lookup needed.
* @param connection - The WebSocket connection to notify
*/
_notifyStreamResuming(connection) {
if (!this._activeStreamId || !this._activeRequestId) return;
connection.send(JSON.stringify({
type: MessageType.CF_AGENT_STREAM_RESUMING,
id: this._activeRequestId
}));
}
/**
* Send stream chunks to a connection after receiving ACK.
* @param connection - The WebSocket connection
* @param streamId - The stream to replay
* @param requestId - The original request ID
*/
_sendStreamChunks(connection, streamId, requestId) {
this._flushChunkBuffer();
const chunks = this.sql`
select * from cf_ai_chat_stream_chunks
where stream_id = ${streamId}
order by chunk_index asc
`;
for (const chunk of chunks || []) connection.send(JSON.stringify({
body: chunk.body,
done: false,
id: requestId,
type: MessageType.CF_AGENT_USE_CHAT_RESPONSE
}));
if (this._activeStreamId !== streamId) connection.send(JSON.stringify({
body: "",
done: true,
id: requestId,
type: MessageType.CF_AGENT_USE_CHAT_RESPONSE
}));
}
/**
* Buffer a stream chunk for batch write to SQLite.
* @param streamId - The stream this chunk belongs to
* @param body - The serialized chunk body
* @internal Protected for testing purposes.
*/
_storeStreamChunk(streamId, body) {
if (this._chunkBuffer.length >= CHUNK_BUFFER_MAX_SIZE) this._flushChunkBuffer();
this._chunkBuffer.push({
id: nanoid(),
streamId,
body,
index: this._streamChunkIndex
});
this._streamChunkIndex++;
if (this._chunkBuffer.length >= CHUNK_BUFFER_SIZE) this._flushChunkBuffer();
}
/**
* Flush buffered chunks to SQLite in a single batch.
* Uses a lock to prevent concurrent flush operations.
* @internal Protected for testing purposes.
*/
_flushChunkBuffer() {
if (this._isFlushingChunks || this._chunkBuffer.length === 0) return;
this._isFlushingChunks = true;
try {
const chunks = this._chunkBuffer;
this._chunkBuffer = [];
const now = Date.now();
for (const chunk of chunks) this.sql`
insert into cf_ai_chat_stream_chunks (id, stream_id, body, chunk_index, created_at)
values (${chunk.id}, ${chunk.streamId}, ${chunk.body}, ${chunk.index}, ${now})
`;
} finally {
this._isFlushingChunks = false;
}
}
/**
* Start tracking a new stream for resumable streaming.
* Creates metadata entry in SQLite and sets up tracking state.
* @param requestId - The unique ID of the chat request
* @returns The generated stream ID
* @internal Protected for testing purposes.
*/
_startStream(requestId) {
this._flushChunkBuffer();
const streamId = nanoid();
this._activeStreamId = streamId;
this._activeRequestId = requestId;
this._streamChunkIndex = 0;
this.sql`
insert into cf_ai_chat_stream_metadata (id, request_id, status, created_at)
values (${streamId}, ${requestId}, 'streaming', ${Date.now()})
`;
return streamId;
}
/**
* Mark a stream as completed and flush any pending chunks.
* @param streamId - The stream to mark as completed
* @internal Protected for testing purposes.
*/
_completeStream(streamId) {
this._flushChunkBuffer();
this.sql`
update cf_ai_chat_stream_metadata
set status = 'completed', completed_at = ${Date.now()}
where id = ${streamId}
`;
this._activeStreamId = null;
this._activeRequestId = null;
this._streamChunkIndex = 0;
this._maybeCleanupOldStreams();
}
/**
* Clean up old completed streams if enough time has passed since last cleanup.
* This prevents database growth while avoiding cleanup overhead on every stream completion.
*/
_maybeCleanupOldStreams() {
const now = Date.now();
if (now - this._lastCleanupTime < CLEANUP_INTERVAL_MS) return;
this._lastCleanupTime = now;
const cutoff = now - CLEANUP_AGE_THRESHOLD_MS;
this.sql`
delete from cf_ai_chat_stream_chunks
where stream_id in (
select id from cf_ai_chat_stream_metadata
where status = 'completed' and completed_at < ${cutoff}
)
`;
this.sql`
delete from cf_ai_chat_stream_metadata
where status = 'completed' and completed_at < ${cutoff}
`;
}
_broadcastChatMessage(message, exclude) {
this.broadcast(JSON.stringify(message), exclude);
}
_loadMessagesFromDb() {
return (this.sql`select * from cf_ai_chat_agent_messages order by created_at` || []).map((row) => {
try {
return JSON.parse(row.message);
} catch (error) {
console.error(`Failed to parse message ${row.id}:`, error);
return null;
}
}).filter((msg) => msg !== null);
}
async onRequest(request) {
return this._tryCatchChat(async () => {
if (new URL(request.url).pathname.endsWith("/get-messages")) {
const messages = this._loadMessagesFromDb();
return Response.json(messages);
}
return super.onRequest(request);
});
}
async _tryCatchChat(fn) {
try {
return await fn();
} catch (e) {
throw this.onError(e);
}
}
/**
* Handle incoming chat messages and generate a response
* @param onFinish Callback to be called when the response is finished
* @param options Options including abort signal and client-defined tools
* @returns Response to send to the client or undefined
*/
async onChatMessage(onFinish, options) {
throw new Error("recieved a chat message, override onChatMessage and return a Response to send to the client");
}
/**
* Save messages on the server side
* @param messages Chat messages to save
*/
async saveMessages(messages) {
await this.persistMessages(messages);
await this._tryCatchChat(async () => {
const response = await this.onChatMessage(() => {});
if (response) this._reply(crypto.randomUUID(), response);
});
}
async persistMessages(messages, excludeBroadcastIds = []) {
const mergedMessages = this._mergeIncomingWithServerState(messages);
for (const message of mergedMessages) {
const sanitizedMessage = this._sanitizeMessageForPersistence(message);
const messageToSave = this._resolveMessageForToolMerge(sanitizedMessage);
this.sql`
insert into cf_ai_chat_agent_messages (id, message)
values (${messageToSave.id}, ${JSON.stringify(messageToSave)})
on conflict(id) do update set message = excluded.message
`;
}
this.messages = autoTransformMessages(this._loadMessagesFromDb());
this._broadcastChatMessage({
messages: mergedMessages,
type: MessageType.CF_AGENT_CHAT_MESSAGES
}, excludeBroadcastIds);
}
/**
* Merges incoming messages with existing server state.
* This preserves tool outputs that the server has (via _applyToolResult)
* but the client doesn't have yet.
*
* @param incomingMessages - Messages from the client
* @returns Messages with server's tool outputs preserved
*/
_mergeIncomingWithServerState(incomingMessages) {
const serverToolOutputs = /* @__PURE__ */ new Map();
for (const msg of this.messages) {
if (msg.role !== "assistant") continue;
for (const part of msg.parts) if ("toolCallId" in part && "state" in part && part.state === "output-available" && "output" in part) serverToolOutputs.set(part.toolCallId, part.output);
}
if (serverToolOutputs.size === 0) return incomingMessages;
return incomingMessages.map((msg) => {
if (msg.role !== "assistant") return msg;
let hasChanges = false;
const updatedParts = msg.parts.map((part) => {
if ("toolCallId" in part && "state" in part && part.state === "input-available" && serverToolOutputs.has(part.toolCallId)) {
hasChanges = true;
return {
...part,
state: "output-available",
output: serverToolOutputs.get(part.toolCallId)
};
}
return part;
});
return hasChanges ? {
...msg,
parts: updatedParts
} : msg;
});
}
/**
* Resolves a message for persistence, handling tool result merging.
* If the message contains tool parts with output-available state, checks if there's
* an existing message with the same toolCallId that should be updated instead of
* creating a duplicate. This prevents the "Duplicate item found" error from OpenAI
* when client-side tool results arrive in a new request.
*
* @param message - The message to potentially merge
* @returns The message with the correct ID (either original or merged)
*/
_resolveMessageForToolMerge(message) {
if (message.role !== "assistant") return message;
for (const part of message.parts) if ("toolCallId" in part && "state" in part && part.state === "output-available") {
const toolCallId = part.toolCallId;
const existingMessage = this._findMessageByToolCallId(toolCallId);
if (existingMessage && existingMessage.id !== message.id) return {
...message,
id: existingMessage.id
};
}
return message;
}
/**
* Finds an existing assistant message that contains a tool part with the given toolCallId.
* Used to detect when a tool result should update an existing message rather than
* creating a new one.
*
* @param toolCallId - The tool call ID to search for
* @returns The existing message if found, undefined otherwise
*/
_findMessageByToolCallId(toolCallId) {
for (const msg of this.messages) {
if (msg.role !== "assistant") continue;
for (const part of msg.parts) if ("toolCallId" in part && part.toolCallId === toolCallId) return msg;
}
}
/**
* Sanitizes a message for persistence by removing ephemeral provider-specific
* data that should not be stored or sent back in subsequent requests.
*
* This handles two issues with the OpenAI Responses API:
*
* 1. **Duplicate item IDs**: The AI SDK's @ai-sdk/openai provider (v2.0.x+)
* defaults to using OpenAI's Responses API which assigns unique itemIds
* to each message part. When these IDs are persisted and sent back,
* OpenAI rejects them as duplicates.
*
* 2. **Empty reasoning parts**: OpenAI may return reasoning parts with empty
* text and encrypted content. These cause "Non-OpenAI reasoning parts are
* not supported" warnings when sent back via convertToModelMessages().
*
* @param message - The message to sanitize
* @returns A new message with ephemeral provider data removed
*/
_sanitizeMessageForPersistence(message) {
const sanitizedParts = message.parts.filter((part) => {
if (part.type === "reasoning") {
const reasoningPart = part;
if (!reasoningPart.text || reasoningPart.text.trim() === "") return false;
}
return true;
}).map((part) => {
let sanitizedPart = part;
if ("providerMetadata" in sanitizedPart && sanitizedPart.providerMetadata && typeof sanitizedPart.providerMetadata === "object" && "openai" in sanitizedPart.providerMetadata) sanitizedPart = this._stripOpenAIMetadata(sanitizedPart, "providerMetadata");
if ("callProviderMetadata" in sanitizedPart && sanitizedPart.callProviderMetadata && typeof sanitizedPart.callProviderMetadata === "object" && "openai" in sanitizedPart.callProviderMetadata) sanitizedPart = this._stripOpenAIMetadata(sanitizedPart, "callProviderMetadata");
return sanitizedPart;
});
return {
...message,
parts: sanitizedParts
};
}
/**
* Helper to strip OpenAI-specific ephemeral fields from a metadata object.
* Removes itemId and reasoningEncryptedContent while preserving other fields.
*/
_stripOpenAIMetadata(part, metadataKey) {
const metadata = part[metadataKey];
if (!metadata?.openai) return part;
const { itemId: _itemId, reasoningEncryptedContent: _rec, ...restOpenai } = metadata.openai;
const hasOtherOpenaiFields = Object.keys(restOpenai).length > 0;
const { openai: _openai, ...restMetadata } = metadata;
let newMetadata;
if (hasOtherOpenaiFields) newMetadata = {
...restMetadata,
openai: restOpenai
};
else if (Object.keys(restMetadata).length > 0) newMetadata = restMetadata;
const { [metadataKey]: _oldMeta, ...restPart } = part;
if (newMetadata) return {
...restPart,
[metadataKey]: newMetadata
};
return restPart;
}
/**
* Applies a tool result to an existing assistant message.
* This is used when the client sends CF_AGENT_TOOL_RESULT for client-side tools.
* The server is the source of truth, so we update the message here and broadcast
* the update to all clients.
*
* @param toolCallId - The tool call ID this result is for
* @param toolName - The name of the tool
* @param output - The output from the tool execution
* @returns true if the result was applied, false if the message was not found
*/
async _applyToolResult(toolCallId, _toolName, output) {
let message;
if (this._streamingMessage) {
for (const part of this._streamingMessage.parts) if ("toolCallId" in part && part.toolCallId === toolCallId) {
message = this._streamingMessage;
break;
}
}
if (!message) for (let attempt = 0; attempt < 10; attempt++) {
message = this._findMessageByToolCallId(toolCallId);
if (message) break;
await new Promise((resolve) => setTimeout(resolve, 100));
}
if (!message) {
console.warn(`[AIChatAgent] _applyToolResult: Could not find message with toolCallId ${toolCallId} after retries`);
return false;
}
const isStreamingMessage = message === this._streamingMessage;
let updated = false;
if (isStreamingMessage) {
for (const part of message.parts) if ("toolCallId" in part && part.toolCallId === toolCallId && "state" in part && part.state === "input-available") {
part.state = "output-available";
part.output = output;
updated = true;
break;
}
} else {
const updatedParts = message.parts.map((part) => {
if ("toolCallId" in part && part.toolCallId === toolCallId && "state" in part && part.state === "input-available") {
updated = true;
return {
...part,
state: "output-available",
output
};
}
return part;
});
if (updated) {
const updatedMessage = this._sanitizeMessageForPersistence({
...message,
parts: updatedParts
});
this.sql`
update cf_ai_chat_agent_messages
set message = ${JSON.stringify(updatedMessage)}
where id = ${message.id}
`;
this.messages = autoTransformMessages(this._loadMessagesFromDb());
}
}
if (!updated) {
console.warn(`[AIChatAgent] _applyToolResult: Tool part with toolCallId ${toolCallId} not in input-available state`);
return false;
}
if (!isStreamingMessage) {
const broadcastMessage = this._findMessageByToolCallId(toolCallId);
if (broadcastMessage) this._broadcastChatMessage({
type: MessageType.CF_AGENT_MESSAGE_UPDATED,
message: broadcastMessage
});
}
return true;
}
async _reply(id, response, excludeBroadcastIds = [], options = {}) {
const { continuation = false } = options;
return this._tryCatchChat(async () => {
if (!response.body) {
this._broadcastChatMessage({
body: "",
done: true,
id,
type: MessageType.CF_AGENT_USE_CHAT_RESPONSE,
...continuation && { continuation: true }
});
return;
}
const streamId = this._startStream(id);
const { getToolName: getToolName$1, isToolUIPart: isToolUIPart$1, parsePartialJson } = await import("ai");
const reader = response.body.getReader();
const message = {
id: `assistant_${Date.now()}_${Math.random().toString(36).slice(2, 11)}`,
role: "assistant",
parts: []
};
this._streamingMessage = message;
this._streamCompletionPromise = new Promise((resolve) => {
this._streamCompletionResolve = resolve;
});
let activeTextParts = {};
let activeReasoningParts = {};
const partialToolCalls = {};
function updateDynamicToolPart(options$1) {
const part = message.parts.find((part$1) => part$1.type === "dynamic-tool" && part$1.toolCallId === options$1.toolCallId);
const anyOptions = options$1;
const anyPart = part;
if (part != null) {
part.state = options$1.state;
anyPart.toolName = options$1.toolName;
anyPart.input = anyOptions.input;
anyPart.output = anyOptions.output;
anyPart.errorText = anyOptions.errorText;
anyPart.rawInput = anyOptions.rawInput ?? anyPart.rawInput;
anyPart.preliminary = anyOptions.preliminary;
if (anyOptions.providerMetadata != null && part.state === "input-available") part.callProviderMetadata = anyOptions.providerMetadata;
} else message.parts.push({
type: "dynamic-tool",
toolName: options$1.toolName,
toolCallId: options$1.toolCallId,
state: options$1.state,
input: anyOptions.input,
output: anyOptions.output,
errorText: anyOptions.errorText,
preliminary: anyOptions.preliminary,
...anyOptions.providerMetadata != null ? { callProviderMetadata: anyOptions.providerMetadata } : {}
});
}
function updateToolPart(options$1) {
const part = message.parts.find((part$1) => isToolUIPart$1(part$1) && part$1.toolCallId === options$1.toolCallId);
const anyOptions = options$1;
const anyPart = part;
if (part != null) {
part.state = options$1.state;
anyPart.input = anyOptions.input;
anyPart.output = anyOptions.output;
anyPart.errorText = anyOptions.errorText;
anyPart.rawInput = anyOptions.rawInput;
anyPart.preliminary = anyOptions.preliminary;
anyPart.providerExecuted = anyOptions.providerExecuted ?? part.providerExecuted;
if (anyOptions.providerMetadata != null && part.state === "input-available") part.callProviderMetadata = anyOptions.providerMetadata;
} else message.parts.push({
type: `tool-${options$1.toolName}`,
toolCallId: options$1.toolCallId,
state: options$1.state,
input: anyOptions.input,
output: anyOptions.output,
rawInput: anyOptions.rawInput,
errorText: anyOptions.errorText,
providerExecuted: anyOptions.providerExecuted,
preliminary: anyOptions.preliminary,
...anyOptions.providerMetadata != null ? { callProviderMetadata: anyOptions.providerMetadata } : {}
});
}
async function updateMessageMetadata(metadata) {
if (metadata != null) message.metadata = message.metadata != null ? {
...message.metadata,
...metadata
} : metadata;
}
let streamCompleted = false;
try {
while (true) {
const { done, value } = await reader.read();
if (done) {
this._completeStream(streamId);
streamCompleted = true;
this._broadcastChatMessage({
body: "",
done: true,
id,
type: MessageType.CF_AGENT_USE_CHAT_RESPONSE,
...continuation && { continuation: true }
});
break;
}
const chunk = decoder.decode(value);
if ((response.headers.get("content-type") || "").includes("text/event-stream")) {
const lines = chunk.split("\n");
for (const line of lines) if (line.startsWith("data: ") && line !== "data: [DONE]") try {
const data = JSON.parse(line.slice(6));
switch (data.type) {
case "text-start": {
const textPart = {
type: "text",
text: "",
providerMetadata: data.providerMetadata,
state: "streaming"
};
activeTextParts[data.id] = textPart;
message.parts.push(textPart);
break;
}
case "text-delta": {
const textPart = activeTextParts[data.id];
textPart.text += data.delta;
textPart.providerMetadata = data.providerMetadata ?? textPart.providerMetadata;
break;
}
case "text-end": {
const textPart = activeTextParts[data.id];
textPart.state = "done";
textPart.providerMetadata = data.providerMetadata ?? textPart.providerMetadata;
delete activeTextParts[data.id];
break;
}
case "reasoning-start": {
const reasoningPart = {
type: "reasoning",
text: "",
providerMetadata: data.providerMetadata,
state: "streaming"
};
activeReasoningParts[data.id] = reasoningPart;
message.parts.push(reasoningPart);
break;
}
case "reasoning-delta": {
const reasoningPart = activeReasoningParts[data.id];
reasoningPart.text += data.delta;
reasoningPart.providerMetadata = data.providerMetadata ?? reasoningPart.providerMetadata;
break;
}
case "reasoning-end": {
const reasoningPart = activeReasoningParts[data.id];
reasoningPart.providerMetadata = data.providerMetadata ?? reasoningPart.providerMetadata;
reasoningPart.state = "done";
delete activeReasoningParts[data.id];
break;
}
case "file":
message.parts.push({
type: "file",
mediaType: data.mediaType,
url: data.url
});
break;
case "source-url":
message.parts.push({
type: "source-url",
sourceId: data.sourceId,
url: data.url,
title: data.title,
providerMetadata: data.providerMetadata
});
break;
case "source-document":
message.parts.push({
type: "source-document",
sourceId: data.sourceId,
mediaType: data.mediaType,
title: data.title,
filename: data.filename,
providerMetadata: data.providerMetadata
});
break;
case "tool-input-start": {
const toolInvocations = message.parts.filter(isToolUIPart$1);
partialToolCalls[data.toolCallId] = {
text: "",
toolName: data.toolName,
index: toolInvocations.length,
dynamic: data.dynamic
};
if (data.dynamic) updateDynamicToolPart({
toolCallId: data.toolCallId,
toolName: data.toolName,
state: "input-streaming",
input: void 0
});
else updateToolPart({
toolCallId: data.toolCallId,
toolName: data.toolName,
state: "input-streaming",
input: void 0
});
break;
}
case "tool-input-delta": {
const partialToolCall = partialToolCalls[data.toolCallId];
partialToolCall.text += data.inputTextDelta;
const partialArgs = (await parsePartialJson(partialToolCall.text)).value;
if (partialToolCall.dynamic) updateDynamicToolPart({
toolCallId: data.toolCallId,
toolName: partialToolCall.toolName,
state: "input-streaming",
input: partialArgs
});
else updateToolPart({
toolCallId: data.toolCallId,
toolName: partialToolCall.toolName,
state: "input-streaming",
input: partialArgs
});
break;
}
case "tool-input-available":
if (data.dynamic) updateDynamicToolPart({
toolCallId: data.toolCallId,
toolName: data.toolName,
state: "input-available",
input: data.input,
providerMetadata: data.providerMetadata
});
else updateToolPart({
toolCallId: data.toolCallId,
toolName: data.toolName,
state: "input-available",
input: data.input,
providerExecuted: data.providerExecuted,
providerMetadata: data.providerMetadata
});
break;
case "tool-input-error":
if (data.dynamic) updateDynamicToolPart({
toolCallId: data.toolCallId,
toolName: data.toolName,
state: "output-error",
input: data.input,
errorText: data.errorText,
providerMetadata: data.providerMetadata
});
else updateToolPart({
toolCallId: data.toolCallId,
toolName: data.toolName,
state: "output-error",
input: void 0,
rawInput: data.input,
errorText: data.errorText,
providerExecuted: data.providerExecuted,
providerMetadata: data.providerMetadata
});
break;
case "tool-output-available":
if (data.dynamic) {
const toolInvocation = message.parts.filter((part) => part.type === "dynamic-tool").find((invocation) => invocation.toolCallId === data.toolCallId);
if (!toolInvocation) throw new Error("Tool invocation not found");
updateDynamicToolPart({
toolCallId: data.toolCallId,
toolName: toolInvocation.toolName,
state: "output-available",
input: toolInvocation.input,
output: data.output,
preliminary: data.preliminary
});
} else {
const toolInvocation = message.parts.filter(isToolUIPart$1).find((invocation) => invocation.toolCallId === data.toolCallId);
if (!toolInvocation) throw new Error("Tool invocation not found");
updateToolPart({
toolCallId: data.toolCallId,
toolName: getToolName$1(toolInvocation),
state: "output-available",
input: toolInvocation.input,
output: data.output,
providerExecuted: data.providerExecuted,
preliminary: data.preliminary
});
}
break;
case "tool-output-error":
if (data.dynamic) {
const toolInvocation = message.parts.filter((part) => part.type === "dynamic-tool").find((invocation) => invocation.toolCallId === data.toolCallId);
if (!toolInvocation) throw new Error("Tool invocation not found");
updateDynamicToolPart({
toolCallId: data.toolCallId,
toolName: toolInvocation.toolName,
state: "output-error",
input: toolInvocation.input,
errorText: data.errorText
});
} else {
const toolInvocation = message.parts.filter(isToolUIPart$1).find((invocation) => invocation.toolCallId === data.toolCallId);
if (!toolInvocation) throw new Error("Tool invocation not found");
updateToolPart({
toolCallId: data.toolCallId,
toolName: getToolName$1(toolInvocation),
state: "output-error",
input: toolInvocation.input,
rawInput: "rawInput" in toolInvocation ? toolInvocation.rawInput : void 0,
errorText: data.errorText
});
}
break;
case "start-step":
message.parts.push({ type: "step-start" });
break;
case "finish-step":
activeTextParts = {};
activeReasoningParts = {};
break;
case "start":
if (data.messageId != null) message.id = data.messageId;
await updateMessageMetadata(data.messageMetadata);
break;
case "finish":
await updateMessageMetadata(data.messageMetadata);
break;
case "message-metadata":
await updateMessageMetadata(data.messageMetadata);
break;
case "error":
this._broadcastChatMessage({
error: true,
body: data.errorText ?? JSON.stringify(data),
done: false,
id,
type: MessageType.CF_AGENT_USE_CHAT_RESPONSE
});
break;
}
let eventToSend = data;
if (data.type === "finish" && "finishReason" in data) {
const { finishReason, ...rest } = data;
eventToSend = {
...rest,
type: "finish",
messageMetadata: { finishReason }
};
}
const chunkBody = JSON.stringify(eventToSend);
this._storeStreamChunk(streamId, chunkBody);
this._broadcastChatMessage({
body: chunkBody,
done: false,
id,
type: MessageType.CF_AGENT_USE_CHAT_RESPONSE,
...continuation && { continuation: true }
});
} catch (_error) {}
} else if (chunk.length > 0) {
message.parts.push({
type: "text",
text: chunk
});
const chunkBody = JSON.stringify({
type: "text-delta",
delta: chunk
});
this._storeStreamChunk(streamId, chunkBody);
this._broadcastChatMessage({
body: chunkBody,
done: false,
id,
type: MessageType.CF_AGENT_USE_CHAT_RESPONSE,
...continuation && { continuation: true }
});
}
}
} catch (error) {
if (!streamCompleted) {
this._markStreamError(streamId);
this._broadcastChatMessage({
body: error instanceof Error ? error.message : "Stream error",
done: true,
error: true,
id,
type: MessageType.CF_AGENT_USE_CHAT_RESPONSE,
...continuation && { continuation: true }
});
}
throw error;
} finally {
reader.releaseLock();
}
if (message.parts.length > 0) if (continuation) {
let lastAssistantIdx = -1;
for (let i = this.messages.length - 1; i >= 0; i--) if (this.messages[i].role === "assistant") {
lastAssistantIdx = i;
break;
}
if (lastAssistantIdx >= 0) {
const lastAssistant = this.messages[lastAssistantIdx];
const mergedMessage = {
...lastAssistant,
parts: [...lastAssistant.parts, ...message.parts]
};
const updatedMessages = [...this.messages];
updatedMessages[lastAssistantIdx] = mergedMessage;
await this.persistMessages(updatedMessages, excludeBroadcastIds);
} else await this.persistMessages([...this.messages, message], excludeBroadcastIds);
} else await this.persistMessages([...this.messages, message], excludeBroadcastIds);
this._streamingMessage = null;
if (this._streamCompletionResolve) {
this._streamCompletionResolve();
this._streamCompletionResolve = null;
this._streamCompletionPromise = null;
}
});
}
/**
* Mark a stream as errored and clean up state.
* @param streamId - The stream to mark as errored
* @internal Protected for testing purposes.
*/
_markStreamError(streamId) {
this._flushChunkBuffer();
this.sql`
update cf_ai_chat_stream_metadata
set status = 'error', completed_at = ${Date.now()}
where id = ${streamId}
`;
this._activeStreamId = null;
this._activeRequestId = null;
this._streamChunkIndex = 0;
}
/**
* For the given message id, look up its associated AbortController
* If the AbortController does not exist, create and store one in memory
*
* returns the AbortSignal associated with the AbortController
*/
_getAbortSignal(id) {
if (typeof id !== "string") return;
if (!this._chatMessageAbortControllers.has(id)) this._chatMessageAbortControllers.set(id, new AbortController());
return this._chatMessageAbortControllers.get(id)?.signal;
}
/**
* Remove an abort controller from the cache of pending message responses
*/
_removeAbortController(id) {
this._chatMessageAbortControllers.delete(id);
}
/**
* Propagate an abort signal for any requests associated with the given message id
*/
_cancelChatRequest(id) {
if (this._chatMessageAbortControllers.has(id)) this._chatMessageAbortControllers.get(id)?.abort();
}
/**
* Abort all pending requests and clear the cache of AbortControllers
*/
_destroyAbortControllers() {
for (const controller of this._chatMessageAbortControllers.values()) controller?.abort();
this._chatMessageAbortControllers.clear();
}
/**
* When the DO is destroyed, cancel all pending requests and clean up resources
*/
async destroy() {
this._destroyAbortControllers();
this._flushChunkBuffer();
this.sql`drop table if exists cf_ai_chat_stream_chunks`;
this.sql`drop table if exists cf_ai_chat_stream_metadata`;
this._activeStreamId = null;
this._activeRequestId = null;
await super.destroy();
}
};
//#endregion
export { AIChatAgent, createToolsFromClientSchemas };
//# sourceMappingURL=ai-chat-agent.js.map