import { Hono } from 'hono';
import { cors } from 'hono/cors';
import { logger } from 'hono/logger';
import {
GeminiClient,
type CacheStorage,
type CacheMetadata,
type CacheListItem,
type UsageLogger,
type UsageEntry,
type UsageStats,
type UsageOperation,
MnemoConfigSchema,
calculateCost,
UrlAdapter,
} from '@mnemo/core';
import { MnemoMCPServer, toolDefinitions } from '@mnemo/mcp-server';
// Env interface is defined in worker-configuration.d.ts (generated by wrangler types)
// Create app with bindings type
const app = new Hono<{ Bindings: Env }>();
// Middleware
app.use('*', cors());
app.use('*', logger());
// Rate limiting store (in-memory, resets on Worker restart)
// For production, consider using KV or Durable Objects for persistence
interface RateLimitEntry {
count: number;
resetAt: number;
}
const rateLimitStore = new Map<string, RateLimitEntry>();
/**
* Rate limiting middleware
* Limits requests per IP address to prevent abuse
* @param maxRequests - Maximum requests allowed per window
* @param windowMs - Time window in milliseconds (default: 60000 = 1 minute)
*/
const rateLimit = (maxRequests: number = 30, windowMs: number = 60000) => {
return async (c: any, next: any) => {
// Get client IP from CF headers
const ip = c.req.header('CF-Connecting-IP') || c.req.header('X-Forwarded-For') || 'unknown';
const now = Date.now();
const entry = rateLimitStore.get(ip);
// Clean up expired entries periodically
if (rateLimitStore.size > 10000) {
for (const [key, value] of rateLimitStore.entries()) {
if (value.resetAt < now) {
rateLimitStore.delete(key);
}
}
}
if (!entry || entry.resetAt < now) {
// New window
rateLimitStore.set(ip, {
count: 1,
resetAt: now + windowMs,
});
return await next();
}
if (entry.count >= maxRequests) {
const resetIn = Math.ceil((entry.resetAt - now) / 1000);
return c.json({
error: 'Rate limit exceeded',
message: `Too many requests. Please try again in ${resetIn} seconds.`,
retryAfter: resetIn,
}, 429);
}
// Increment counter
entry.count++;
return await next();
};
};
// Authentication middleware factory
// Returns 401 if MNEMO_AUTH_TOKEN is configured and request doesn't have valid Bearer token
// If no token is configured, allows unauthenticated access (backwards compatible)
const requireAuth = () => {
return async (c: any, next: any) => {
const authToken = c.env.MNEMO_AUTH_TOKEN;
// If no auth token configured, allow access
if (!authToken) {
return await next();
}
// Auth token is configured, validate request
const header = c.req.header('Authorization');
if (!header) {
return c.json({
error: 'Unauthorized',
message: 'Missing Authorization header. Use: Authorization: Bearer <token>'
}, 401);
}
const token = header.replace(/^Bearer\s+/i, '');
if (token !== authToken) {
return c.json({
error: 'Unauthorized',
message: 'Invalid authentication token'
}, 401);
}
// Valid token, proceed
return await next();
};
};
// ============================================================================
// Routes
// ============================================================================
// Health check
app.get('/health', (c) => {
return c.json({
status: 'ok',
service: 'mnemo',
version: '0.1.0',
environment: c.env.ENVIRONMENT,
});
});
// Service info
app.get('/', (c) => {
return c.json({
name: 'mnemo',
version: '0.1.0',
description: 'Extended memory for AI assistants via Gemini context caching',
endpoints: {
health: 'GET /health',
tools: 'GET /tools',
mcp: 'POST /mcp',
},
});
});
// List available tools
app.get('/tools', (c) => {
return c.json({ tools: toolDefinitions });
});
// MCP protocol endpoint (protected with auth and rate limiting)
app.post('/mcp', rateLimit(30, 60000), requireAuth(), async (c) => {
const server = createMCPServer(c.env);
try {
const request = await c.req.json();
const response = await server.handleRequest(request);
return c.json(response);
} catch (error) {
return c.json({
jsonrpc: '2.0',
id: null,
error: {
code: -32700,
message: 'Parse error',
},
}, 400);
}
});
// Direct tool invocation (convenience endpoints, protected with auth and rate limiting)
app.post('/tools/:toolName', rateLimit(30, 60000), requireAuth(), async (c) => {
const toolName = c.req.param('toolName');
const server = createMCPServer(c.env);
try {
const args = await c.req.json();
const response = await server.handleRequest({
jsonrpc: '2.0',
id: 1,
method: 'tools/call',
params: {
name: toolName,
arguments: args,
},
});
// Extract result from MCP response
if ('result' in response && response.result) {
return c.json(response.result);
}
if ('error' in response && response.error) {
return c.json({ error: response.error.message }, 400);
}
return c.json(response);
} catch (error) {
return c.json({ error: 'Invalid request' }, 400);
}
});
// ============================================================================
// Helpers
// ============================================================================
function createMCPServer(env: Env): MnemoMCPServer {
const config = MnemoConfigSchema.parse({
geminiApiKey: env.GEMINI_API_KEY,
});
const geminiClient = new GeminiClient(config);
const storage = new D1CacheStorage(env.DB);
const usageLogger = new D1UsageLogger(env.DB);
// Workers have a 50 subrequest limit, use 40 to leave headroom
const urlAdapter = new UrlAdapter({ maxSubrequests: 40 });
return new MnemoMCPServer({
geminiClient,
storage,
usageLogger,
urlAdapter,
});
}
// ============================================================================
// D1 Storage Implementation
// ============================================================================
class D1CacheStorage implements CacheStorage {
constructor(private db: D1Database) {}
async save(metadata: CacheMetadata): Promise<void> {
await this.db
.prepare(
`INSERT INTO caches (id, alias, gemini_cache_name, source, token_count, model, expires_at)
VALUES (?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(alias) DO UPDATE SET
gemini_cache_name = excluded.gemini_cache_name,
source = excluded.source,
token_count = excluded.token_count,
model = excluded.model,
expires_at = excluded.expires_at`
)
.bind(
crypto.randomUUID(),
metadata.alias,
metadata.name,
metadata.source,
metadata.tokenCount,
metadata.model ?? null,
metadata.expiresAt.toISOString()
)
.run();
}
async getByAlias(alias: string): Promise<CacheMetadata | null> {
const result = await this.db
.prepare('SELECT * FROM caches WHERE alias = ?')
.bind(alias)
.first<{
id: string;
alias: string;
gemini_cache_name: string;
source: string;
token_count: number;
model: string | null;
created_at: string;
expires_at: string;
}>();
if (!result) return null;
return {
name: result.gemini_cache_name,
alias: result.alias,
tokenCount: result.token_count,
createdAt: new Date(result.created_at),
expiresAt: new Date(result.expires_at),
source: result.source,
model: result.model ?? undefined,
};
}
async getByName(name: string): Promise<CacheMetadata | null> {
const result = await this.db
.prepare('SELECT * FROM caches WHERE gemini_cache_name = ?')
.bind(name)
.first<{
id: string;
alias: string;
gemini_cache_name: string;
source: string;
token_count: number;
model: string | null;
created_at: string;
expires_at: string;
}>();
if (!result) return null;
return {
name: result.gemini_cache_name,
alias: result.alias,
tokenCount: result.token_count,
createdAt: new Date(result.created_at),
expiresAt: new Date(result.expires_at),
source: result.source,
model: result.model ?? undefined,
};
}
async list(): Promise<CacheListItem[]> {
const results = await this.db
.prepare('SELECT alias, token_count, expires_at, source FROM caches ORDER BY created_at DESC')
.all<{
alias: string;
token_count: number;
expires_at: string;
source: string;
}>();
return (results.results ?? []).map((row) => ({
alias: row.alias,
tokenCount: row.token_count,
expiresAt: new Date(row.expires_at),
source: row.source,
}));
}
async deleteByAlias(alias: string): Promise<boolean> {
const result = await this.db
.prepare('DELETE FROM caches WHERE alias = ?')
.bind(alias)
.run();
return (result.meta?.changes ?? 0) > 0;
}
async update(alias: string, updates: Partial<CacheMetadata>): Promise<void> {
const sets: string[] = [];
const values: unknown[] = [];
if (updates.expiresAt) {
sets.push('expires_at = ?');
values.push(updates.expiresAt.toISOString());
}
if (updates.tokenCount !== undefined) {
sets.push('token_count = ?');
values.push(updates.tokenCount);
}
if (sets.length === 0) return;
values.push(alias);
await this.db
.prepare(`UPDATE caches SET ${sets.join(', ')} WHERE alias = ?`)
.bind(...values)
.run();
}
}
// ============================================================================
// D1 Usage Logger Implementation
// ============================================================================
class D1UsageLogger implements UsageLogger {
constructor(private db: D1Database) {}
async log(entry: Omit<UsageEntry, 'createdAt'>): Promise<void> {
await this.db
.prepare(
`INSERT INTO usage_logs (cache_id, operation, tokens_used, cached_tokens_used)
VALUES (?, ?, ?, ?)`
)
.bind(entry.cacheId, entry.operation, entry.tokensUsed, entry.cachedTokensUsed)
.run();
}
async getStats(cacheId?: string): Promise<UsageStats> {
const whereClause = cacheId ? 'WHERE cache_id = ?' : '';
// Get totals
const totalsStmt = this.db.prepare(
`SELECT
COUNT(*) as total_ops,
COALESCE(SUM(tokens_used), 0) as total_tokens,
COALESCE(SUM(cached_tokens_used), 0) as total_cached
FROM usage_logs ${whereClause}`
);
const totals = cacheId
? await totalsStmt.bind(cacheId).first<{ total_ops: number; total_tokens: number; total_cached: number }>()
: await totalsStmt.first<{ total_ops: number; total_tokens: number; total_cached: number }>();
// Get breakdown by operation
const byOpStmt = this.db.prepare(
`SELECT
operation,
COUNT(*) as count,
COALESCE(SUM(tokens_used), 0) as tokens_used,
COALESCE(SUM(cached_tokens_used), 0) as cached_tokens_used
FROM usage_logs ${whereClause}
GROUP BY operation`
);
const byOpResults = cacheId
? await byOpStmt.bind(cacheId).all<{
operation: string;
count: number;
tokens_used: number;
cached_tokens_used: number;
}>()
: await byOpStmt.all<{
operation: string;
count: number;
tokens_used: number;
cached_tokens_used: number;
}>();
const byOperation: Record<UsageOperation, { count: number; tokensUsed: number; cachedTokensUsed: number }> = {
load: { count: 0, tokensUsed: 0, cachedTokensUsed: 0 },
query: { count: 0, tokensUsed: 0, cachedTokensUsed: 0 },
evict: { count: 0, tokensUsed: 0, cachedTokensUsed: 0 },
refresh: { count: 0, tokensUsed: 0, cachedTokensUsed: 0 },
};
for (const row of byOpResults.results ?? []) {
const op = row.operation as UsageOperation;
if (op in byOperation) {
byOperation[op] = {
count: row.count,
tokensUsed: row.tokens_used,
cachedTokensUsed: row.cached_tokens_used,
};
}
}
const totalTokens = totals?.total_tokens ?? 0;
const totalCached = totals?.total_cached ?? 0;
return {
totalOperations: totals?.total_ops ?? 0,
totalTokensUsed: totalTokens,
totalCachedTokensUsed: totalCached,
estimatedCost: calculateCost(totalTokens, totalCached),
byOperation,
};
}
async getRecent(limit = 100): Promise<UsageEntry[]> {
const results = await this.db
.prepare(
`SELECT cache_id, operation, tokens_used, cached_tokens_used, created_at
FROM usage_logs
ORDER BY created_at DESC
LIMIT ?`
)
.bind(limit)
.all<{
cache_id: string;
operation: string;
tokens_used: number;
cached_tokens_used: number;
created_at: string;
}>();
return (results.results ?? []).map((row) => ({
cacheId: row.cache_id,
operation: row.operation as UsageOperation,
tokensUsed: row.tokens_used,
cachedTokensUsed: row.cached_tokens_used,
createdAt: new Date(row.created_at),
}));
}
}
export default app;