/**
* Conversation Memory (D1-backed)
*
* UUID-based conversation tracking:
* - No ID provided → server generates new UUID, returns it
* - ID provided → server loads history for that conversation
* - Unguessable UUIDs = implicit access control
*/
import type {
Conversation,
ConversationMessage,
ConversationMetadata,
ChatMessage,
ChatRole,
ToolCall,
} from '../../types';
// Default config
const DEFAULT_MAX_MESSAGES = 50;
const DEFAULT_TTL_HOURS = 168; // 7 days
/**
* Safe JSON parse with fallback
* Prevents crashes from corrupted data in D1
*/
function safeJsonParse<T>(str: string | null | undefined, fallback: T | undefined = undefined): T | undefined {
if (!str) return fallback;
try {
return JSON.parse(str) as T;
} catch {
console.error('Failed to parse JSON:', str.slice(0, 100));
return fallback;
}
}
/**
* Get or create a conversation
*/
export async function getOrCreateConversation(
db: D1Database,
conversationId?: string,
metadata?: ConversationMetadata
): Promise<Conversation> {
const now = Math.floor(Date.now() / 1000);
if (conversationId) {
// Load existing conversation
const row = await db
.prepare('SELECT id, created_at, updated_at, metadata FROM conversations WHERE id = ?')
.bind(conversationId)
.first();
if (row) {
return {
id: row.id as string,
createdAt: row.created_at as number,
updatedAt: row.updated_at as number,
metadata: safeJsonParse<ConversationMetadata>(row.metadata as string),
};
}
// If ID provided but not found, create with that ID
}
// Create new conversation
const id = conversationId || crypto.randomUUID();
await db
.prepare('INSERT INTO conversations (id, created_at, updated_at, metadata) VALUES (?, ?, ?, ?)')
.bind(id, now, now, metadata ? JSON.stringify(metadata) : null)
.run();
return { id, createdAt: now, updatedAt: now, metadata };
}
/**
* Update conversation metadata
*/
export async function updateConversationMetadata(
db: D1Database,
conversationId: string,
metadata: ConversationMetadata
): Promise<void> {
const now = Math.floor(Date.now() / 1000);
await db
.prepare('UPDATE conversations SET metadata = ?, updated_at = ? WHERE id = ?')
.bind(JSON.stringify(metadata), now, conversationId)
.run();
}
/**
* Add a message to a conversation
*/
export async function addMessage(
db: D1Database,
conversationId: string,
message: Omit<ConversationMessage, 'id' | 'createdAt'>
): Promise<ConversationMessage> {
const now = Math.floor(Date.now() / 1000);
const id = crypto.randomUUID();
// Ensure content is always a string (D1 doesn't accept objects)
const content = typeof message.content === 'string'
? message.content
: JSON.stringify(message.content);
await db
.prepare(`
INSERT INTO messages (id, conversation_id, role, content, tool_calls, tool_call_id, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?)
`)
.bind(
id,
conversationId,
message.role,
content,
message.toolCalls ? JSON.stringify(message.toolCalls) : null,
message.toolCallId || null,
now
)
.run();
// Update conversation timestamp
await db
.prepare('UPDATE conversations SET updated_at = ? WHERE id = ?')
.bind(now, conversationId)
.run();
return {
id,
conversationId,
role: message.role,
content,
toolCalls: message.toolCalls,
toolCallId: message.toolCallId,
createdAt: now,
};
}
/**
* Add multiple messages to a conversation (batch insert)
*/
export async function addMessages(
db: D1Database,
conversationId: string,
messages: Array<Omit<ConversationMessage, 'id' | 'createdAt'>>
): Promise<ConversationMessage[]> {
const now = Math.floor(Date.now() / 1000);
const results: ConversationMessage[] = [];
// Use a transaction for batch insert
const statements = messages.map(message => {
const id = crypto.randomUUID();
// Ensure content is always a string (D1 doesn't accept objects)
const content = typeof message.content === 'string'
? message.content
: JSON.stringify(message.content);
results.push({
id,
conversationId,
role: message.role,
content,
toolCalls: message.toolCalls,
toolCallId: message.toolCallId,
createdAt: now,
});
return db
.prepare(`
INSERT INTO messages (id, conversation_id, role, content, tool_calls, tool_call_id, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?)
`)
.bind(
id,
conversationId,
message.role,
content,
message.toolCalls ? JSON.stringify(message.toolCalls) : null,
message.toolCallId || null,
now
);
});
// Add conversation update
statements.push(
db.prepare('UPDATE conversations SET updated_at = ? WHERE id = ?').bind(now, conversationId)
);
await db.batch(statements);
return results;
}
/**
* Get recent messages for a conversation
*/
export async function getMessages(
db: D1Database,
conversationId: string,
limit?: number
): Promise<ConversationMessage[]> {
const maxMessages = limit || DEFAULT_MAX_MESSAGES;
const { results } = await db
.prepare(`
SELECT id, conversation_id, role, content, tool_calls, tool_call_id, created_at
FROM messages
WHERE conversation_id = ?
ORDER BY created_at DESC
LIMIT ?
`)
.bind(conversationId, maxMessages)
.all();
// Reverse to get chronological order
return (results || []).reverse().map(row => ({
id: row.id as string,
conversationId: row.conversation_id as string,
role: row.role as ChatRole,
content: row.content as string,
toolCalls: safeJsonParse<ToolCall[]>(row.tool_calls as string),
toolCallId: row.tool_call_id as string | undefined,
createdAt: row.created_at as number,
}));
}
/**
* Convert D1 messages to ChatMessage format for AI
*/
export function toChatMessages(messages: ConversationMessage[]): ChatMessage[] {
return messages.map(m => ({
role: m.role,
content: m.content,
toolCalls: m.toolCalls,
toolCallId: m.toolCallId,
timestamp: m.createdAt * 1000, // Convert to ms
}));
}
/**
* Delete a conversation and its messages
*/
export async function deleteConversation(
db: D1Database,
conversationId: string
): Promise<void> {
// Messages deleted via CASCADE
await db
.prepare('DELETE FROM conversations WHERE id = ?')
.bind(conversationId)
.run();
}
/**
* Delete old conversations (for cron cleanup)
*/
export async function cleanupOldConversations(
db: D1Database,
ttlHours?: number
): Promise<number> {
const ttl = ttlHours || DEFAULT_TTL_HOURS;
const cutoff = Math.floor(Date.now() / 1000) - (ttl * 60 * 60);
// Messages deleted via CASCADE
const result = await db
.prepare('DELETE FROM conversations WHERE updated_at < ?')
.bind(cutoff)
.run();
return result.meta.changes || 0;
}
/**
* List conversations (with optional user filter)
*/
export async function listConversations(
db: D1Database,
userEmail?: string,
limit = 20
): Promise<Array<Conversation & { messageCount: number }>> {
let query = `
SELECT c.id, c.created_at, c.updated_at, c.metadata,
COUNT(m.id) as message_count
FROM conversations c
LEFT JOIN messages m ON m.conversation_id = c.id
`;
const params: unknown[] = [];
if (userEmail) {
query += ` WHERE json_extract(c.metadata, '$.userEmail') = ?`;
params.push(userEmail);
}
query += ` GROUP BY c.id ORDER BY c.updated_at DESC LIMIT ?`;
params.push(limit);
const { results } = await db.prepare(query).bind(...params).all();
return (results || []).map(row => ({
id: row.id as string,
createdAt: row.created_at as number,
updatedAt: row.updated_at as number,
metadata: safeJsonParse<ConversationMetadata>(row.metadata as string),
messageCount: row.message_count as number,
}));
}
/**
* Get conversation by ID
*/
export async function getConversation(
db: D1Database,
conversationId: string
): Promise<Conversation | null> {
const row = await db
.prepare('SELECT id, created_at, updated_at, metadata FROM conversations WHERE id = ?')
.bind(conversationId)
.first();
if (!row) return null;
return {
id: row.id as string,
createdAt: row.created_at as number,
updatedAt: row.updated_at as number,
metadata: safeJsonParse<ConversationMetadata>(row.metadata as string),
};
}