/**
* Admin AI Chat Handler
*
* Handles AI chat sessions with MCP tool execution.
* - Session management (D1 with KV fallback)
* - Tool execution via MCP endpoint
* - Streaming responses via SSE
*/
import type {
Env,
ChatMessage,
AdminChatSession,
ToolMetadata,
ChatDelta,
ConversationMetadata,
} from '../types';
import { chat } from '../lib/ai';
import { getToolsMetadata } from '../tools';
import {
getOrCreateConversation,
getMessages,
addMessage,
addMessages,
toChatMessages,
updateConversationMetadata,
deleteConversation as deleteConversationD1,
listConversations,
} from '../lib/memory';
// Session TTL: 7 days
const SESSION_TTL_SECONDS = 7 * 24 * 60 * 60;
/**
* Check if D1 conversation memory is enabled
*/
function isMemoryEnabled(env: Env): boolean {
return !!(env.DB && env.ENABLE_CONVERSATION_MEMORY === 'true');
}
/**
* Load a chat session (D1 or KV)
*/
export async function loadChatSession(
env: Env,
sessionId: string,
adminEmail?: string
): Promise<AdminChatSession | null> {
// Use D1 if enabled
if (isMemoryEnabled(env)) {
return loadChatSessionFromD1(env.DB!, sessionId, adminEmail);
}
// Fallback to KV
return loadChatSessionFromKV(env.OAUTH_KV, sessionId);
}
/**
* Load chat session from D1
*/
async function loadChatSessionFromD1(
db: D1Database,
sessionId: string,
adminEmail?: string
): Promise<AdminChatSession | null> {
try {
const conversation = await getOrCreateConversation(db, sessionId, {
source: 'admin',
userEmail: adminEmail,
});
const messages = await getMessages(db, conversation.id);
return {
id: conversation.id,
adminEmail: conversation.metadata?.userEmail as string || adminEmail || '',
messages: toChatMessages(messages),
createdAt: conversation.createdAt * 1000,
updatedAt: conversation.updatedAt * 1000,
provider: conversation.metadata?.provider as string,
model: conversation.metadata?.model as string,
};
} catch (error) {
console.error('D1 load error:', error);
return null;
}
}
/**
* Load chat session from KV (legacy)
*/
async function loadChatSessionFromKV(
kv: KVNamespace,
sessionId: string
): Promise<AdminChatSession | null> {
const key = `chat:${sessionId}`;
const data = await kv.get(key, 'json');
return data as AdminChatSession | null;
}
/**
* Save a chat session (D1 or KV)
*/
export async function saveChatSession(
env: Env,
session: AdminChatSession
): Promise<void> {
if (isMemoryEnabled(env)) {
await saveChatSessionToD1(env.DB!, session);
} else {
await saveChatSessionToKV(env.OAUTH_KV, session);
}
}
/**
* Save chat session to D1
*/
async function saveChatSessionToD1(
db: D1Database,
session: AdminChatSession
): Promise<void> {
// Update metadata
await updateConversationMetadata(db, session.id, {
source: 'admin',
userEmail: session.adminEmail,
provider: session.provider,
model: session.model,
});
// Note: Messages are already added incrementally via addMessage()
// This function is mainly for metadata updates
}
/**
* Save chat session to KV (legacy)
*/
async function saveChatSessionToKV(
kv: KVNamespace,
session: AdminChatSession
): Promise<void> {
const key = `chat:${session.id}`;
await kv.put(key, JSON.stringify(session), {
expirationTtl: SESSION_TTL_SECONDS,
});
}
/**
* Create a new chat session
*/
export async function createChatSession(
env: Env,
adminEmail: string,
provider?: string,
model?: string
): Promise<AdminChatSession> {
const id = crypto.randomUUID();
const now = Date.now();
// If D1 enabled, create conversation
if (isMemoryEnabled(env)) {
await getOrCreateConversation(env.DB!, id, {
source: 'admin',
userEmail: adminEmail,
provider: provider || 'cloudflare',
model: model || '@cf/meta/llama-4-scout-17b-16e-instruct',
});
}
return {
id,
adminEmail,
messages: [],
createdAt: now,
updatedAt: now,
provider: provider || 'cloudflare',
model: model || '@cf/meta/llama-4-scout-17b-16e-instruct',
};
}
/**
* Add a message to session (D1 or in-memory)
*/
export async function addSessionMessage(
env: Env,
session: AdminChatSession,
message: ChatMessage
): Promise<void> {
// Add to in-memory session
session.messages.push(message);
session.updatedAt = Date.now();
// Persist to D1 if enabled
if (isMemoryEnabled(env)) {
await addMessage(env.DB!, session.id, {
conversationId: session.id,
role: message.role,
content: message.content,
toolCalls: message.toolCalls,
toolCallId: message.toolCallId,
});
}
}
/**
* Execute an MCP tool via Durable Object RPC (direct call, no HTTP)
*/
export async function executeMcpTool(
env: Env,
toolName: string,
args: Record<string, unknown>
): Promise<{ success: boolean; result?: unknown; error?: string }> {
try {
// Get DO stub and call executeTool directly via RPC
// Use a consistent ID for the "admin" session
const id = env.MCP_OBJECT.idFromName('admin-chat');
const stub = env.MCP_OBJECT.get(id);
// Call the executeTool method directly (Durable Object RPC)
const result = await (stub as unknown as {
executeTool(name: string, args: Record<string, unknown>): Promise<{
success: boolean;
result?: unknown;
error?: string;
}>;
}).executeTool(toolName, args);
return result;
} catch (error) {
return {
success: false,
error: error instanceof Error ? error.message : 'Tool execution failed',
};
}
}
/**
* Format SSE data event
*/
function formatSSE(event: string, data: unknown): string {
return `event: ${event}\ndata: ${JSON.stringify(data)}\n\n`;
}
/**
* Handle a chat message and return a streaming response
*/
export async function handleChatMessage(
env: Env,
session: AdminChatSession,
userMessage: string
): Promise<ReadableStream<Uint8Array>> {
const encoder = new TextEncoder();
// Add user message to session (persists to D1 if enabled)
await addSessionMessage(env, session, {
role: 'user',
content: userMessage,
timestamp: Date.now(),
});
// Get tools metadata (single source of truth from tools module)
const toolsMetadata = getToolsMetadata();
const systemMessage: ChatMessage = {
role: 'system',
content: `You are an AI tool testing assistant. You help users test MCP tools on this server.
## Available Tools (${toolsMetadata.length} total)
### Utility Tools (no auth required)
${toolsMetadata.filter(t => !t.requiresAuth).map(t => `- **${t.name}**: ${t.description}`).join('\n')}
### OAuth Tools (require Google authentication)
${toolsMetadata.filter(t => t.requiresAuth).map(t => `- **${t.name}**: ${t.description}`).join('\n')}
## CRITICAL INSTRUCTIONS
**DO NOT call tools** when user asks:
- "list tools", "what tools", "available tools", "show tools" → Just respond with the list above
- General questions about the server → Just answer from your knowledge
**DO call tools** when user explicitly says:
- "test [tool]", "run [tool]", "try [tool]", "execute [tool]" → Call the tool **exactly once**, then show the result
- "get the time", "generate a uuid", "hash this text" → Call the appropriate tool **once**
- "test all tools" → Test each tool **once** each, calling them in sequence
**IMPORTANT**: Never call the same tool multiple times unless the user explicitly asks you to repeat it.
## Response Format
- Use markdown headings (## or ###) for sections
- Use **bold** for tool names and important info
- Use \`code\` for parameters and values
- Keep responses concise`,
};
// Max tool call rounds to prevent infinite loops
const MAX_TOOL_ROUNDS = 5;
return new ReadableStream({
async start(controller) {
try {
let toolRound = 0;
let hasMoreToolCalls = true;
// Multi-turn tool calling loop
while (hasMoreToolCalls && toolRound < MAX_TOOL_ROUNDS) {
// Get AI response with tools
const response = await chat(env, [systemMessage, ...session.messages], {
provider: (session.provider || 'cloudflare') as 'cloudflare' | 'openai' | 'anthropic' | 'google-ai-studio',
model: session.model || '@cf/meta/llama-4-scout-17b-16e-instruct',
tools: toolsMetadata,
});
// Handle tool calls
if (response.toolCalls && response.toolCalls.length > 0) {
toolRound++;
// IMPORTANT: Add assistant message WITH tool_calls first
// OpenAI requires tool results to follow an assistant message containing tool_calls
await addSessionMessage(env, session, {
role: 'assistant',
content: response.content || '',
toolCalls: response.toolCalls,
timestamp: Date.now(),
});
// Stream any content that came with the tool calls
if (response.content) {
controller.enqueue(encoder.encode(formatSSE('content', {
content: response.content,
})));
}
for (const toolCall of response.toolCalls) {
// Send tool call event
controller.enqueue(encoder.encode(formatSSE('tool_call', {
id: toolCall.id,
name: toolCall.name,
arguments: toolCall.arguments,
})));
// Execute the tool via DO RPC
const result = await executeMcpTool(
env,
toolCall.name,
toolCall.arguments
);
// Send tool result event
controller.enqueue(encoder.encode(formatSSE('tool_result', {
id: toolCall.id,
name: toolCall.name,
success: result.success,
result: result.result,
error: result.error,
})));
// Add tool result to messages for context
await addSessionMessage(env, session, {
role: 'tool',
content: result.success
? JSON.stringify(result.result)
: `Error: ${result.error}`,
toolCallId: toolCall.id,
timestamp: Date.now(),
});
}
// Continue loop to check if AI wants to call more tools
} else {
// No more tool calls - send final content response
hasMoreToolCalls = false;
if (response.content) {
controller.enqueue(encoder.encode(formatSSE('content', {
content: response.content,
})));
await addSessionMessage(env, session, {
role: 'assistant',
content: response.content,
timestamp: Date.now(),
});
}
}
}
// If we hit max rounds, get a final summary
if (toolRound >= MAX_TOOL_ROUNDS) {
const finalResponse = await chat(env, [systemMessage, ...session.messages], {
provider: (session.provider || 'cloudflare') as 'cloudflare' | 'openai' | 'anthropic' | 'google-ai-studio',
model: session.model || '@cf/meta/llama-4-scout-17b-16e-instruct',
// No tools - force text response
});
if (finalResponse.content) {
controller.enqueue(encoder.encode(formatSSE('content', {
content: finalResponse.content,
})));
await addSessionMessage(env, session, {
role: 'assistant',
content: finalResponse.content,
timestamp: Date.now(),
});
}
}
// Save session metadata (messages already saved via addSessionMessage)
await saveChatSession(env, session);
// Send done event
controller.enqueue(encoder.encode(formatSSE('done', {
sessionId: session.id,
})));
controller.close();
} catch (error) {
const message = error instanceof Error ? error.message : 'Unknown error';
controller.enqueue(encoder.encode(formatSSE('error', { error: message })));
controller.close();
}
},
});
}
/**
* List chat sessions for an admin
*/
export async function listChatSessions(
env: Env,
adminEmail: string
): Promise<Array<{ id: string; updatedAt: number; messageCount: number }>> {
// Use D1 if enabled
if (isMemoryEnabled(env)) {
const conversations = await listConversations(env.DB!, adminEmail);
return conversations.map(c => ({
id: c.id,
updatedAt: c.updatedAt * 1000, // Convert to ms
messageCount: c.messageCount,
}));
}
// Fallback to KV
const list = await env.OAUTH_KV.list({ prefix: 'chat:' });
const sessions: Array<{ id: string; updatedAt: number; messageCount: number }> = [];
for (const key of list.keys) {
const session = await env.OAUTH_KV.get(key.name, 'json') as AdminChatSession | null;
if (session && session.adminEmail === adminEmail) {
sessions.push({
id: session.id,
updatedAt: session.updatedAt,
messageCount: session.messages.length,
});
}
}
// Sort by most recent
return sessions.sort((a, b) => b.updatedAt - a.updatedAt);
}
/**
* Delete a chat session
*/
export async function deleteChatSession(
env: Env,
sessionId: string
): Promise<void> {
if (isMemoryEnabled(env)) {
await deleteConversationD1(env.DB!, sessionId);
} else {
await env.OAUTH_KV.delete(`chat:${sessionId}`);
}
}