import { t as agentContext } from "./context-BkKbAa1R.js";
import { t as MessageType } from "./ai-types-DEtF_8Km.js";
import { r as camelCaseToKebabCase } from "./client-DjTPRM8-.js";
import { i as DisposableStore, r as MCPConnectionState, t as MCPClientManager } from "./client-QZa2Rq0l.js";
import { t as DurableObjectOAuthClientProvider } from "./do-oauth-client-provider-B1fVIshX.js";
import { parseCronExpression } from "cron-schedule";
import { nanoid } from "nanoid";
import { EmailMessage } from "cloudflare:email";
import { Server, getServerByName, routePartykitRequest } from "partyserver";
//#region src/observability/index.ts
/**
* A generic observability implementation that logs events to the console.
*/
const genericObservability = { emit(event) {
if (isLocalMode()) {
console.log(event.displayMessage);
return;
}
console.log(event);
} };
let localMode = false;
function isLocalMode() {
if (localMode) return true;
const { request } = getCurrentAgent();
if (!request) return false;
localMode = new URL(request.url).hostname === "localhost";
return localMode;
}
//#endregion
//#region src/index.ts
/**
* Type guard for RPC request messages
*/
function isRPCRequest(msg) {
return typeof msg === "object" && msg !== null && "type" in msg && msg.type === MessageType.RPC && "id" in msg && typeof msg.id === "string" && "method" in msg && typeof msg.method === "string" && "args" in msg && Array.isArray(msg.args);
}
/**
* Type guard for state update messages
*/
function isStateUpdateMessage(msg) {
return typeof msg === "object" && msg !== null && "type" in msg && msg.type === MessageType.CF_AGENT_STATE && "state" in msg;
}
const callableMetadata = /* @__PURE__ */ new Map();
/**
* Decorator that marks a method as callable by clients
* @param metadata Optional metadata about the callable method
*/
function callable(metadata = {}) {
return function callableDecorator(target, context) {
if (!callableMetadata.has(target)) callableMetadata.set(target, metadata);
return target;
};
}
let didWarnAboutUnstableCallable = false;
/**
* Decorator that marks a method as callable by clients
* @deprecated this has been renamed to callable, and unstable_callable will be removed in the next major version
* @param metadata Optional metadata about the callable method
*/
const unstable_callable = (metadata = {}) => {
if (!didWarnAboutUnstableCallable) {
didWarnAboutUnstableCallable = true;
console.warn("unstable_callable is deprecated, use callable instead. unstable_callable will be removed in the next major version.");
}
callable(metadata);
};
function getNextCronTime(cron) {
return parseCronExpression(cron).getNextDate();
}
const STATE_ROW_ID = "cf_state_row_id";
const STATE_WAS_CHANGED = "cf_state_was_changed";
const DEFAULT_STATE = {};
function getCurrentAgent() {
const store = agentContext.getStore();
if (!store) return {
agent: void 0,
connection: void 0,
request: void 0,
email: void 0
};
return store;
}
/**
* Wraps a method to run within the agent context, ensuring getCurrentAgent() works properly
* @param agent The agent instance
* @param method The method to wrap
* @returns A wrapped method that runs within the agent context
*/
function withAgentContext(method) {
return function(...args) {
const { connection, request, email, agent } = getCurrentAgent();
if (agent === this) return method.apply(this, args);
return agentContext.run({
agent: this,
connection,
request,
email
}, () => {
return method.apply(this, args);
});
};
}
/**
* Base class for creating Agent implementations
* @template Env Environment type containing bindings
* @template State State type to store within the Agent
*/
var Agent = class Agent extends Server {
/**
* Current state of the Agent
*/
get state() {
if (this._state !== DEFAULT_STATE) return this._state;
const wasChanged = this.sql`
SELECT state FROM cf_agents_state WHERE id = ${STATE_WAS_CHANGED}
`;
const result = this.sql`
SELECT state FROM cf_agents_state WHERE id = ${STATE_ROW_ID}
`;
if (wasChanged[0]?.state === "true" || result[0]?.state) {
const state = result[0]?.state;
this._state = JSON.parse(state);
return this._state;
}
if (this.initialState === DEFAULT_STATE) return;
this.setState(this.initialState);
return this.initialState;
}
static {
this.options = { hibernate: true };
}
/**
* Execute SQL queries against the Agent's database
* @template T Type of the returned rows
* @param strings SQL query template strings
* @param values Values to be inserted into the query
* @returns Array of query results
*/
sql(strings, ...values) {
let query = "";
try {
query = strings.reduce((acc, str, i) => acc + str + (i < values.length ? "?" : ""), "");
return [...this.ctx.storage.sql.exec(query, ...values)];
} catch (e) {
console.error(`failed to execute sql query: ${query}`, e);
throw this.onError(e);
}
}
constructor(ctx, env) {
super(ctx, env);
this._state = DEFAULT_STATE;
this._disposables = new DisposableStore();
this._destroyed = false;
this._ParentClass = Object.getPrototypeOf(this).constructor;
this.initialState = DEFAULT_STATE;
this.observability = genericObservability;
this._flushingQueue = false;
this.alarm = async () => {
const now = Math.floor(Date.now() / 1e3);
const result = this.sql`
SELECT * FROM cf_agents_schedules WHERE time <= ${now}
`;
if (result && Array.isArray(result)) for (const row of result) {
const callback = this[row.callback];
if (!callback) {
console.error(`callback ${row.callback} not found`);
continue;
}
await agentContext.run({
agent: this,
connection: void 0,
request: void 0,
email: void 0
}, async () => {
try {
this.observability?.emit({
displayMessage: `Schedule ${row.id} executed`,
id: nanoid(),
payload: {
callback: row.callback,
id: row.id
},
timestamp: Date.now(),
type: "schedule:execute"
}, this.ctx);
await callback.bind(this)(JSON.parse(row.payload), row);
} catch (e) {
console.error(`error executing callback "${row.callback}"`, e);
}
});
if (row.type === "cron") {
if (this._destroyed) return;
const nextExecutionTime = getNextCronTime(row.cron);
const nextTimestamp = Math.floor(nextExecutionTime.getTime() / 1e3);
this.sql`
UPDATE cf_agents_schedules SET time = ${nextTimestamp} WHERE id = ${row.id}
`;
} else {
if (this._destroyed) return;
this.sql`
DELETE FROM cf_agents_schedules WHERE id = ${row.id}
`;
}
}
if (this._destroyed) return;
await this._scheduleNextAlarm();
};
if (!wrappedClasses.has(this.constructor)) {
this._autoWrapCustomMethods();
wrappedClasses.add(this.constructor);
}
this.sql`
CREATE TABLE IF NOT EXISTS cf_agents_mcp_servers (
id TEXT PRIMARY KEY NOT NULL,
name TEXT NOT NULL,
server_url TEXT NOT NULL,
callback_url TEXT NOT NULL,
client_id TEXT,
auth_url TEXT,
server_options TEXT
)
`;
this.sql`
CREATE TABLE IF NOT EXISTS cf_agents_state (
id TEXT PRIMARY KEY NOT NULL,
state TEXT
)
`;
this.sql`
CREATE TABLE IF NOT EXISTS cf_agents_queues (
id TEXT PRIMARY KEY NOT NULL,
payload TEXT,
callback TEXT,
created_at INTEGER DEFAULT (unixepoch())
)
`;
this.sql`
CREATE TABLE IF NOT EXISTS cf_agents_schedules (
id TEXT PRIMARY KEY NOT NULL DEFAULT (randomblob(9)),
callback TEXT,
payload TEXT,
type TEXT NOT NULL CHECK(type IN ('scheduled', 'delayed', 'cron')),
time INTEGER,
delayInSeconds INTEGER,
cron TEXT,
created_at INTEGER DEFAULT (unixepoch())
)
`;
this.mcp = new MCPClientManager(this._ParentClass.name, "0.0.1", { storage: this.ctx.storage });
this._disposables.add(this.mcp.onServerStateChanged(async () => {
this.broadcastMcpServers();
}));
this._disposables.add(this.mcp.onObservabilityEvent((event) => {
this.observability?.emit(event);
}));
const _onRequest = this.onRequest.bind(this);
this.onRequest = (request) => {
return agentContext.run({
agent: this,
connection: void 0,
request,
email: void 0
}, async () => {
await this.mcp.ensureJsonSchema();
const oauthResponse = await this.handleMcpOAuthCallback(request);
if (oauthResponse) return oauthResponse;
return this._tryCatch(() => _onRequest(request));
});
};
const _onMessage = this.onMessage.bind(this);
this.onMessage = async (connection, message) => {
return agentContext.run({
agent: this,
connection,
request: void 0,
email: void 0
}, async () => {
await this.mcp.ensureJsonSchema();
if (typeof message !== "string") return this._tryCatch(() => _onMessage(connection, message));
let parsed;
try {
parsed = JSON.parse(message);
} catch (_e) {
return this._tryCatch(() => _onMessage(connection, message));
}
if (isStateUpdateMessage(parsed)) {
this._setStateInternal(parsed.state, connection);
return;
}
if (isRPCRequest(parsed)) {
try {
const { id, method, args } = parsed;
const methodFn = this[method];
if (typeof methodFn !== "function") throw new Error(`Method ${method} does not exist`);
if (!this._isCallable(method)) throw new Error(`Method ${method} is not callable`);
const metadata = callableMetadata.get(methodFn);
if (metadata?.streaming) {
const stream = new StreamingResponse(connection, id);
await methodFn.apply(this, [stream, ...args]);
return;
}
const result = await methodFn.apply(this, args);
this.observability?.emit({
displayMessage: `RPC call to ${method}`,
id: nanoid(),
payload: {
method,
streaming: metadata?.streaming
},
timestamp: Date.now(),
type: "rpc"
}, this.ctx);
const response = {
done: true,
id,
result,
success: true,
type: MessageType.RPC
};
connection.send(JSON.stringify(response));
} catch (e) {
const response = {
error: e instanceof Error ? e.message : "Unknown error occurred",
id: parsed.id,
success: false,
type: MessageType.RPC
};
connection.send(JSON.stringify(response));
console.error("RPC error:", e);
}
return;
}
return this._tryCatch(() => _onMessage(connection, message));
});
};
const _onConnect = this.onConnect.bind(this);
this.onConnect = (connection, ctx$1) => {
return agentContext.run({
agent: this,
connection,
request: ctx$1.request,
email: void 0
}, async () => {
if (this.state) connection.send(JSON.stringify({
state: this.state,
type: MessageType.CF_AGENT_STATE
}));
connection.send(JSON.stringify({
mcp: this.getMcpServers(),
type: MessageType.CF_AGENT_MCP_SERVERS
}));
this.observability?.emit({
displayMessage: "Connection established",
id: nanoid(),
payload: { connectionId: connection.id },
timestamp: Date.now(),
type: "connect"
}, this.ctx);
return this._tryCatch(() => _onConnect(connection, ctx$1));
});
};
const _onStart = this.onStart.bind(this);
this.onStart = async (props) => {
return agentContext.run({
agent: this,
connection: void 0,
request: void 0,
email: void 0
}, async () => {
await this._tryCatch(async () => {
await this.mcp.restoreConnectionsFromStorage(this.name);
this.broadcastMcpServers();
return _onStart(props);
});
});
};
}
_setStateInternal(state, source = "server") {
this._state = state;
this.sql`
INSERT OR REPLACE INTO cf_agents_state (id, state)
VALUES (${STATE_ROW_ID}, ${JSON.stringify(state)})
`;
this.sql`
INSERT OR REPLACE INTO cf_agents_state (id, state)
VALUES (${STATE_WAS_CHANGED}, ${JSON.stringify(true)})
`;
this.broadcast(JSON.stringify({
state,
type: MessageType.CF_AGENT_STATE
}), source !== "server" ? [source.id] : []);
return this._tryCatch(() => {
const { connection, request, email } = agentContext.getStore() || {};
return agentContext.run({
agent: this,
connection,
request,
email
}, async () => {
this.observability?.emit({
displayMessage: "State updated",
id: nanoid(),
payload: {},
timestamp: Date.now(),
type: "state:update"
}, this.ctx);
return this.onStateUpdate(state, source);
});
});
}
/**
* Update the Agent's state
* @param state New state to set
*/
setState(state) {
this._setStateInternal(state, "server");
}
/**
* Called when the Agent's state is updated
* @param state Updated state
* @param source Source of the state update ("server" or a client connection)
*/
onStateUpdate(state, source) {}
/**
* Called when the Agent receives an email via routeAgentEmail()
* Override this method to handle incoming emails
* @param email Email message to process
*/
async _onEmail(email) {
return agentContext.run({
agent: this,
connection: void 0,
request: void 0,
email
}, async () => {
if ("onEmail" in this && typeof this.onEmail === "function") return this._tryCatch(() => this.onEmail(email));
else {
console.log("Received email from:", email.from, "to:", email.to);
console.log("Subject:", email.headers.get("subject"));
console.log("Implement onEmail(email: AgentEmail): Promise<void> in your agent to process emails");
}
});
}
/**
* Reply to an email
* @param email The email to reply to
* @param options Options for the reply
* @returns void
*/
async replyToEmail(email, options) {
return this._tryCatch(async () => {
const agentName = camelCaseToKebabCase(this._ParentClass.name);
const agentId = this.name;
const { createMimeMessage } = await import("mimetext");
const msg = createMimeMessage();
msg.setSender({
addr: email.to,
name: options.fromName
});
msg.setRecipient(email.from);
msg.setSubject(options.subject || `Re: ${email.headers.get("subject")}` || "No subject");
msg.addMessage({
contentType: options.contentType || "text/plain",
data: options.body
});
const messageId = `<${agentId}@${email.from.split("@")[1]}>`;
msg.setHeader("In-Reply-To", email.headers.get("Message-ID"));
msg.setHeader("Message-ID", messageId);
msg.setHeader("X-Agent-Name", agentName);
msg.setHeader("X-Agent-ID", agentId);
if (options.headers) for (const [key, value] of Object.entries(options.headers)) msg.setHeader(key, value);
await email.reply({
from: email.to,
raw: msg.asRaw(),
to: email.from
});
});
}
async _tryCatch(fn) {
try {
return await fn();
} catch (e) {
throw this.onError(e);
}
}
/**
* Automatically wrap custom methods with agent context
* This ensures getCurrentAgent() works in all custom methods without decorators
*/
_autoWrapCustomMethods() {
const basePrototypes = [Agent.prototype, Server.prototype];
const baseMethods = /* @__PURE__ */ new Set();
for (const baseProto of basePrototypes) {
let proto$1 = baseProto;
while (proto$1 && proto$1 !== Object.prototype) {
const methodNames = Object.getOwnPropertyNames(proto$1);
for (const methodName of methodNames) baseMethods.add(methodName);
proto$1 = Object.getPrototypeOf(proto$1);
}
}
let proto = Object.getPrototypeOf(this);
let depth = 0;
while (proto && proto !== Object.prototype && depth < 10) {
const methodNames = Object.getOwnPropertyNames(proto);
for (const methodName of methodNames) {
const descriptor = Object.getOwnPropertyDescriptor(proto, methodName);
if (baseMethods.has(methodName) || methodName.startsWith("_") || !descriptor || !!descriptor.get || typeof descriptor.value !== "function") continue;
const wrappedFunction = withAgentContext(this[methodName]);
if (this._isCallable(methodName)) callableMetadata.set(wrappedFunction, callableMetadata.get(this[methodName]));
this.constructor.prototype[methodName] = wrappedFunction;
}
proto = Object.getPrototypeOf(proto);
depth++;
}
}
onError(connectionOrError, error) {
let theError;
if (connectionOrError && error) {
theError = error;
console.error("Error on websocket connection:", connectionOrError.id, theError);
console.error("Override onError(connection, error) to handle websocket connection errors");
} else {
theError = connectionOrError;
console.error("Error on server:", theError);
console.error("Override onError(error) to handle server errors");
}
throw theError;
}
/**
* Render content (not implemented in base class)
*/
render() {
throw new Error("Not implemented");
}
/**
* Queue a task to be executed in the future
* @param payload Payload to pass to the callback
* @param callback Name of the method to call
* @returns The ID of the queued task
*/
async queue(callback, payload) {
const id = nanoid(9);
if (typeof callback !== "string") throw new Error("Callback must be a string");
if (typeof this[callback] !== "function") throw new Error(`this.${callback} is not a function`);
this.sql`
INSERT OR REPLACE INTO cf_agents_queues (id, payload, callback)
VALUES (${id}, ${JSON.stringify(payload)}, ${callback})
`;
this._flushQueue().catch((e) => {
console.error("Error flushing queue:", e);
});
return id;
}
async _flushQueue() {
if (this._flushingQueue) return;
this._flushingQueue = true;
while (true) {
const result = this.sql`
SELECT * FROM cf_agents_queues
ORDER BY created_at ASC
`;
if (!result || result.length === 0) break;
for (const row of result || []) {
const callback = this[row.callback];
if (!callback) {
console.error(`callback ${row.callback} not found`);
continue;
}
const { connection, request, email } = agentContext.getStore() || {};
await agentContext.run({
agent: this,
connection,
request,
email
}, async () => {
await callback.bind(this)(JSON.parse(row.payload), row);
await this.dequeue(row.id);
});
}
}
this._flushingQueue = false;
}
/**
* Dequeue a task by ID
* @param id ID of the task to dequeue
*/
async dequeue(id) {
this.sql`DELETE FROM cf_agents_queues WHERE id = ${id}`;
}
/**
* Dequeue all tasks
*/
async dequeueAll() {
this.sql`DELETE FROM cf_agents_queues`;
}
/**
* Dequeue all tasks by callback
* @param callback Name of the callback to dequeue
*/
async dequeueAllByCallback(callback) {
this.sql`DELETE FROM cf_agents_queues WHERE callback = ${callback}`;
}
/**
* Get a queued task by ID
* @param id ID of the task to get
* @returns The task or undefined if not found
*/
async getQueue(id) {
const result = this.sql`
SELECT * FROM cf_agents_queues WHERE id = ${id}
`;
return result ? {
...result[0],
payload: JSON.parse(result[0].payload)
} : void 0;
}
/**
* Get all queues by key and value
* @param key Key to filter by
* @param value Value to filter by
* @returns Array of matching QueueItem objects
*/
async getQueues(key, value) {
return this.sql`
SELECT * FROM cf_agents_queues
`.filter((row) => JSON.parse(row.payload)[key] === value);
}
/**
* Schedule a task to be executed in the future
* @template T Type of the payload data
* @param when When to execute the task (Date, seconds delay, or cron expression)
* @param callback Name of the method to call
* @param payload Data to pass to the callback
* @returns Schedule object representing the scheduled task
*/
async schedule(when, callback, payload) {
const id = nanoid(9);
const emitScheduleCreate = (schedule) => this.observability?.emit({
displayMessage: `Schedule ${schedule.id} created`,
id: nanoid(),
payload: {
callback,
id
},
timestamp: Date.now(),
type: "schedule:create"
}, this.ctx);
if (typeof callback !== "string") throw new Error("Callback must be a string");
if (typeof this[callback] !== "function") throw new Error(`this.${callback} is not a function`);
if (when instanceof Date) {
const timestamp = Math.floor(when.getTime() / 1e3);
this.sql`
INSERT OR REPLACE INTO cf_agents_schedules (id, callback, payload, type, time)
VALUES (${id}, ${callback}, ${JSON.stringify(payload)}, 'scheduled', ${timestamp})
`;
await this._scheduleNextAlarm();
const schedule = {
callback,
id,
payload,
time: timestamp,
type: "scheduled"
};
emitScheduleCreate(schedule);
return schedule;
}
if (typeof when === "number") {
const time = new Date(Date.now() + when * 1e3);
const timestamp = Math.floor(time.getTime() / 1e3);
this.sql`
INSERT OR REPLACE INTO cf_agents_schedules (id, callback, payload, type, delayInSeconds, time)
VALUES (${id}, ${callback}, ${JSON.stringify(payload)}, 'delayed', ${when}, ${timestamp})
`;
await this._scheduleNextAlarm();
const schedule = {
callback,
delayInSeconds: when,
id,
payload,
time: timestamp,
type: "delayed"
};
emitScheduleCreate(schedule);
return schedule;
}
if (typeof when === "string") {
const nextExecutionTime = getNextCronTime(when);
const timestamp = Math.floor(nextExecutionTime.getTime() / 1e3);
this.sql`
INSERT OR REPLACE INTO cf_agents_schedules (id, callback, payload, type, cron, time)
VALUES (${id}, ${callback}, ${JSON.stringify(payload)}, 'cron', ${when}, ${timestamp})
`;
await this._scheduleNextAlarm();
const schedule = {
callback,
cron: when,
id,
payload,
time: timestamp,
type: "cron"
};
emitScheduleCreate(schedule);
return schedule;
}
throw new Error("Invalid schedule type");
}
/**
* Get a scheduled task by ID
* @template T Type of the payload data
* @param id ID of the scheduled task
* @returns The Schedule object or undefined if not found
*/
async getSchedule(id) {
const result = this.sql`
SELECT * FROM cf_agents_schedules WHERE id = ${id}
`;
if (!result || result.length === 0) return;
return {
...result[0],
payload: JSON.parse(result[0].payload)
};
}
/**
* Get scheduled tasks matching the given criteria
* @template T Type of the payload data
* @param criteria Criteria to filter schedules
* @returns Array of matching Schedule objects
*/
getSchedules(criteria = {}) {
let query = "SELECT * FROM cf_agents_schedules WHERE 1=1";
const params = [];
if (criteria.id) {
query += " AND id = ?";
params.push(criteria.id);
}
if (criteria.type) {
query += " AND type = ?";
params.push(criteria.type);
}
if (criteria.timeRange) {
query += " AND time >= ? AND time <= ?";
const start = criteria.timeRange.start || /* @__PURE__ */ new Date(0);
const end = criteria.timeRange.end || /* @__PURE__ */ new Date(999999999999999);
params.push(Math.floor(start.getTime() / 1e3), Math.floor(end.getTime() / 1e3));
}
return this.ctx.storage.sql.exec(query, ...params).toArray().map((row) => ({
...row,
payload: JSON.parse(row.payload)
}));
}
/**
* Cancel a scheduled task
* @param id ID of the task to cancel
* @returns true if the task was cancelled, false if the task was not found
*/
async cancelSchedule(id) {
const schedule = await this.getSchedule(id);
if (!schedule) return false;
this.observability?.emit({
displayMessage: `Schedule ${id} cancelled`,
id: nanoid(),
payload: {
callback: schedule.callback,
id: schedule.id
},
timestamp: Date.now(),
type: "schedule:cancel"
}, this.ctx);
this.sql`DELETE FROM cf_agents_schedules WHERE id = ${id}`;
await this._scheduleNextAlarm();
return true;
}
async _scheduleNextAlarm() {
const result = this.sql`
SELECT time FROM cf_agents_schedules
WHERE time >= ${Math.floor(Date.now() / 1e3)}
ORDER BY time ASC
LIMIT 1
`;
if (!result) return;
if (result.length > 0 && "time" in result[0]) {
const nextTime = result[0].time * 1e3;
await this.ctx.storage.setAlarm(nextTime);
}
}
/**
* Destroy the Agent, removing all state and scheduled tasks
*/
async destroy() {
this.sql`DROP TABLE IF EXISTS cf_agents_mcp_servers`;
this.sql`DROP TABLE IF EXISTS cf_agents_state`;
this.sql`DROP TABLE IF EXISTS cf_agents_schedules`;
this.sql`DROP TABLE IF EXISTS cf_agents_queues`;
await this.ctx.storage.deleteAlarm();
await this.ctx.storage.deleteAll();
this._disposables.dispose();
await this.mcp.dispose();
this._destroyed = true;
setTimeout(() => {
this.ctx.abort("destroyed");
}, 0);
this.observability?.emit({
displayMessage: "Agent destroyed",
id: nanoid(),
payload: {},
timestamp: Date.now(),
type: "destroy"
}, this.ctx);
}
/**
* Get all methods marked as callable on this Agent
* @returns A map of method names to their metadata
*/
_isCallable(method) {
return callableMetadata.has(this[method]);
}
/**
* Connect to a new MCP Server
*
* @param serverName Name of the MCP server
* @param url MCP Server SSE URL
* @param callbackHost Base host for the agent, used for the redirect URI. If not provided, will be derived from the current request.
* @param agentsPrefix agents routing prefix if not using `agents`
* @param options MCP client and transport options
* @returns Server id and state - either "authenticating" with authUrl, or "ready"
* @throws If connection or discovery fails
*/
async addMcpServer(serverName, url, callbackHost, agentsPrefix = "agents", options) {
let resolvedCallbackHost = callbackHost;
if (!resolvedCallbackHost) {
const { request } = getCurrentAgent();
if (!request) throw new Error("callbackHost is required when not called within a request context");
const requestUrl = new URL(request.url);
resolvedCallbackHost = `${requestUrl.protocol}//${requestUrl.host}`;
}
const callbackUrl = `${resolvedCallbackHost}/${agentsPrefix}/${camelCaseToKebabCase(this._ParentClass.name)}/${this.name}/callback`;
await this.mcp.ensureJsonSchema();
const id = nanoid(8);
const authProvider = new DurableObjectOAuthClientProvider(this.ctx.storage, this.name, callbackUrl);
authProvider.serverId = id;
const transportType = options?.transport?.type ?? "auto";
let headerTransportOpts = {};
if (options?.transport?.headers) headerTransportOpts = {
eventSourceInit: { fetch: (url$1, init) => fetch(url$1, {
...init,
headers: options?.transport?.headers
}) },
requestInit: { headers: options?.transport?.headers }
};
await this.mcp.registerServer(id, {
url,
name: serverName,
callbackUrl,
client: options?.client,
transport: {
...headerTransportOpts,
authProvider,
type: transportType
}
});
const result = await this.mcp.connectToServer(id);
if (result.state === MCPConnectionState.FAILED) throw new Error(`Failed to connect to MCP server at ${url}: ${result.error}`);
if (result.state === MCPConnectionState.AUTHENTICATING) return {
id,
state: result.state,
authUrl: result.authUrl
};
const discoverResult = await this.mcp.discoverIfConnected(id);
if (discoverResult && !discoverResult.success) throw new Error(`Failed to discover MCP server capabilities: ${discoverResult.error}`);
return {
id,
state: MCPConnectionState.READY
};
}
async removeMcpServer(id) {
await this.mcp.removeServer(id);
}
getMcpServers() {
const mcpState = {
prompts: this.mcp.listPrompts(),
resources: this.mcp.listResources(),
servers: {},
tools: this.mcp.listTools()
};
const servers = this.mcp.listServers();
if (servers && Array.isArray(servers) && servers.length > 0) for (const server of servers) {
const serverConn = this.mcp.mcpConnections[server.id];
let defaultState = "not-connected";
if (!serverConn && server.auth_url) defaultState = "authenticating";
mcpState.servers[server.id] = {
auth_url: server.auth_url,
capabilities: serverConn?.serverCapabilities ?? null,
instructions: serverConn?.instructions ?? null,
name: server.name,
server_url: server.server_url,
state: serverConn?.connectionState ?? defaultState
};
}
return mcpState;
}
broadcastMcpServers() {
this.broadcast(JSON.stringify({
mcp: this.getMcpServers(),
type: MessageType.CF_AGENT_MCP_SERVERS
}));
}
/**
* Handle MCP OAuth callback request if it's an OAuth callback.
*
* This method encapsulates the entire OAuth callback flow:
* 1. Checks if the request is an MCP OAuth callback
* 2. Processes the OAuth code exchange
* 3. Establishes the connection if successful
* 4. Broadcasts MCP server state updates
* 5. Returns the appropriate HTTP response
*
* @param request The incoming HTTP request
* @returns Response if this was an OAuth callback, null otherwise
*/
async handleMcpOAuthCallback(request) {
if (!this.mcp.isCallbackRequest(request)) return null;
const result = await this.mcp.handleCallbackRequest(request);
if (result.authSuccess) this.mcp.establishConnection(result.serverId).catch((error) => {
console.error("[Agent handleMcpOAuthCallback] Connection establishment failed:", error);
});
this.broadcastMcpServers();
return this.handleOAuthCallbackResponse(result, request);
}
/**
* Handle OAuth callback response using MCPClientManager configuration
* @param result OAuth callback result
* @param request The original request (needed for base URL)
* @returns Response for the OAuth callback
*/
handleOAuthCallbackResponse(result, request) {
const config = this.mcp.getOAuthCallbackConfig();
if (config?.customHandler) return config.customHandler(result);
const baseOrigin = new URL(request.url).origin;
if (config?.successRedirect && result.authSuccess) try {
return Response.redirect(new URL(config.successRedirect, baseOrigin).href);
} catch (e) {
console.error("Invalid successRedirect URL:", config.successRedirect, e);
return Response.redirect(baseOrigin);
}
if (config?.errorRedirect && !result.authSuccess) try {
const errorUrl = `${config.errorRedirect}?error=${encodeURIComponent(result.authError || "Unknown error")}`;
return Response.redirect(new URL(errorUrl, baseOrigin).href);
} catch (e) {
console.error("Invalid errorRedirect URL:", config.errorRedirect, e);
return Response.redirect(baseOrigin);
}
return Response.redirect(baseOrigin);
}
};
const wrappedClasses = /* @__PURE__ */ new Set();
/**
* Route a request to the appropriate Agent
* @param request Request to route
* @param env Environment containing Agent bindings
* @param options Routing options
* @returns Response from the Agent or undefined if no route matched
*/
async function routeAgentRequest(request, env, options) {
const corsHeaders = options?.cors === true ? {
"Access-Control-Allow-Credentials": "true",
"Access-Control-Allow-Methods": "GET, POST, HEAD, OPTIONS",
"Access-Control-Allow-Origin": "*",
"Access-Control-Max-Age": "86400"
} : options?.cors;
if (request.method === "OPTIONS") {
if (corsHeaders) return new Response(null, { headers: corsHeaders });
console.warn("Received an OPTIONS request, but cors was not enabled. Pass `cors: true` or `cors: { ...custom cors headers }` to routeAgentRequest to enable CORS.");
}
let response = await routePartykitRequest(request, env, {
prefix: "agents",
...options
});
if (response && corsHeaders && request.headers.get("upgrade")?.toLowerCase() !== "websocket" && request.headers.get("Upgrade")?.toLowerCase() !== "websocket") {
const newHeaders = new Headers(response.headers);
for (const [key, value] of Object.entries(corsHeaders)) newHeaders.set(key, value);
response = new Response(response.body, {
status: response.status,
statusText: response.statusText,
headers: newHeaders
});
}
return response;
}
/**
* Create a resolver that uses the message-id header to determine the agent to route the email to
* @returns A function that resolves the agent to route the email to
*/
function createHeaderBasedEmailResolver() {
return async (email, _env) => {
const messageId = email.headers.get("message-id");
if (messageId) {
const messageIdMatch = messageId.match(/<([^@]+)@([^>]+)>/);
if (messageIdMatch) {
const [, agentId$1, domain] = messageIdMatch;
return {
agentName: domain.split(".")[0],
agentId: agentId$1
};
}
}
const references = email.headers.get("references");
if (references) {
const referencesMatch = references.match(/<([A-Za-z0-9+/]{43}=)@([^>]+)>/);
if (referencesMatch) {
const [, base64Id, domain] = referencesMatch;
const agentId$1 = Buffer.from(base64Id, "base64").toString("hex");
return {
agentName: domain.split(".")[0],
agentId: agentId$1
};
}
}
const agentName = email.headers.get("x-agent-name");
const agentId = email.headers.get("x-agent-id");
if (agentName && agentId) return {
agentName,
agentId
};
return null;
};
}
/**
* Create a resolver that uses the email address to determine the agent to route the email to
* @param defaultAgentName The default agent name to use if the email address does not contain a sub-address
* @returns A function that resolves the agent to route the email to
*/
function createAddressBasedEmailResolver(defaultAgentName) {
return async (email, _env) => {
const emailMatch = email.to.match(/^([^+@]+)(?:\+([^@]+))?@(.+)$/);
if (!emailMatch) return null;
const [, localPart, subAddress] = emailMatch;
if (subAddress) return {
agentName: localPart,
agentId: subAddress
};
return {
agentName: defaultAgentName,
agentId: localPart
};
};
}
/**
* Create a resolver that uses the agentName and agentId to determine the agent to route the email to
* @param agentName The name of the agent to route the email to
* @param agentId The id of the agent to route the email to
* @returns A function that resolves the agent to route the email to
*/
function createCatchAllEmailResolver(agentName, agentId) {
return async () => ({
agentName,
agentId
});
}
const agentMapCache = /* @__PURE__ */ new WeakMap();
/**
* Route an email to the appropriate Agent
* @param email The email to route
* @param env The environment containing the Agent bindings
* @param options The options for routing the email
* @returns A promise that resolves when the email has been routed
*/
async function routeAgentEmail(email, env, options) {
const routingInfo = await options.resolver(email, env);
if (!routingInfo) {
console.warn("No routing information found for email, dropping message");
return;
}
if (!agentMapCache.has(env)) {
const map = {};
for (const [key, value] of Object.entries(env)) if (value && typeof value === "object" && "idFromName" in value && typeof value.idFromName === "function") {
map[key] = value;
map[camelCaseToKebabCase(key)] = value;
}
agentMapCache.set(env, map);
}
const agentMap = agentMapCache.get(env);
const namespace = agentMap[routingInfo.agentName];
if (!namespace) {
const availableAgents = Object.keys(agentMap).filter((key) => !key.includes("-")).join(", ");
throw new Error(`Agent namespace '${routingInfo.agentName}' not found in environment. Available agents: ${availableAgents}`);
}
const agent = await getAgentByName(namespace, routingInfo.agentId);
const serialisableEmail = {
getRaw: async () => {
const reader = email.raw.getReader();
const chunks = [];
let done = false;
while (!done) {
const { value, done: readerDone } = await reader.read();
done = readerDone;
if (value) chunks.push(value);
}
const totalLength = chunks.reduce((sum, chunk) => sum + chunk.length, 0);
const combined = new Uint8Array(totalLength);
let offset = 0;
for (const chunk of chunks) {
combined.set(chunk, offset);
offset += chunk.length;
}
return combined;
},
headers: email.headers,
rawSize: email.rawSize,
setReject: (reason) => {
email.setReject(reason);
},
forward: (rcptTo, headers) => {
return email.forward(rcptTo, headers);
},
reply: (options$1) => {
return email.reply(new EmailMessage(options$1.from, options$1.to, options$1.raw));
},
from: email.from,
to: email.to
};
await agent._onEmail(serialisableEmail);
}
/**
* Get or create an Agent by name
* @template Env Environment type containing bindings
* @template T Type of the Agent class
* @param namespace Agent namespace
* @param name Name of the Agent instance
* @param options Options for Agent creation
* @returns Promise resolving to an Agent instance stub
*/
async function getAgentByName(namespace, name, options) {
return getServerByName(namespace, name, options);
}
/**
* A wrapper for streaming responses in callable methods
*/
var StreamingResponse = class {
constructor(connection, id) {
this._closed = false;
this._connection = connection;
this._id = id;
}
/**
* Send a chunk of data to the client
* @param chunk The data to send
*/
send(chunk) {
if (this._closed) throw new Error("StreamingResponse is already closed");
const response = {
done: false,
id: this._id,
result: chunk,
success: true,
type: MessageType.RPC
};
this._connection.send(JSON.stringify(response));
}
/**
* End the stream and send the final chunk (if any)
* @param finalChunk Optional final chunk of data to send
*/
end(finalChunk) {
if (this._closed) throw new Error("StreamingResponse is already closed");
this._closed = true;
const response = {
done: true,
id: this._id,
result: finalChunk,
success: true,
type: MessageType.RPC
};
this._connection.send(JSON.stringify(response));
}
};
//#endregion
export { createCatchAllEmailResolver as a, getCurrentAgent as c, unstable_callable as d, genericObservability as f, createAddressBasedEmailResolver as i, routeAgentEmail as l, StreamingResponse as n, createHeaderBasedEmailResolver as o, callable as r, getAgentByName as s, Agent as t, routeAgentRequest as u };
//# sourceMappingURL=src-BZDh910Z.js.map