Skip to main content
Glama

Karakeep MCP server

by karakeep-app
index.ts4.98 kB
import * as restate from "@restatedev/restate-sdk"; import * as restateClient from "@restatedev/restate-sdk-clients"; import type { PluginProvider } from "@karakeep/shared/plugins"; import type { EnqueueOptions, Queue, QueueClient, QueueOptions, Runner, RunnerFuncs, RunnerOptions, } from "@karakeep/shared/queueing"; import logger from "@karakeep/shared/logger"; import { AdminClient } from "./admin"; import { envConfig } from "./env"; import { idProvider } from "./idProvider"; import { semaphore } from "./semaphore"; import { buildRestateService } from "./service"; class RestateQueueWrapper<T> implements Queue<T> { constructor( private readonly _name: string, private readonly client: restateClient.Ingress, private readonly adminClient: AdminClient, public readonly opts: QueueOptions, ) {} name(): string { return this._name; } async enqueue( payload: T, options?: EnqueueOptions, ): Promise<string | undefined> { interface MyService { run: ( ctx: restate.Context, data: { payload: T; priority: number; }, ) => Promise<void>; } const cl = this.client.serviceSendClient<MyService>({ name: this.name() }); const res = await cl.run( { payload, priority: options?.priority ?? 0, }, restateClient.rpc.sendOpts({ delay: options?.delayMs ? { milliseconds: options.delayMs, } : undefined, idempotencyKey: options?.idempotencyKey, }), ); return res.invocationId; } async stats(): Promise<{ pending: number; pending_retry: number; running: number; failed: number; }> { const res = await this.adminClient.getStats(this.name()); return { pending: res.pending + res.ready, pending_retry: res["backing-off"] + res.paused + res.suspended, running: res.running, failed: 0, }; } async cancelAllNonRunning(): Promise<number> { throw new Error("Method not implemented."); } } class RestateRunnerWrapper<T> implements Runner<T> { constructor( private readonly wf: restate.ServiceDefinition< string, { run: (ctx: restate.Context, data: T) => Promise<void>; } >, ) {} async run(): Promise<void> { // No-op for restate } async stop(): Promise<void> { // No-op for restate } async runUntilEmpty(): Promise<void> { throw new Error("Method not implemented."); } get def(): restate.WorkflowDefinition<string, unknown> { return this.wf; } } class RestateQueueClient implements QueueClient { private client: restateClient.Ingress; private adminClient: AdminClient; private queues = new Map<string, RestateQueueWrapper<unknown>>(); private services = new Map<string, RestateRunnerWrapper<unknown>>(); constructor() { this.client = restateClient.connect({ url: envConfig.RESTATE_INGRESS_ADDR, }); this.adminClient = new AdminClient(envConfig.RESTATE_ADMIN_ADDR); } async prepare(): Promise<void> { // No-op for restate } async start(): Promise<void> { const port = await restate.serve({ port: envConfig.RESTATE_LISTEN_PORT ?? 0, services: [ ...[...this.services.values()].map((svc) => svc.def), semaphore, idProvider, ], identityKeys: envConfig.RESTATE_PUB_KEY ? [envConfig.RESTATE_PUB_KEY] : undefined, logger: (meta, msg) => { if (meta.context) { // No need to log invocation logs } else { logger.log(meta.level, `[restate] ${msg}`); } }, }); logger.info(`Restate listening on port ${port}`); } createQueue<T>(name: string, opts: QueueOptions): Queue<T> { if (this.queues.has(name)) { throw new Error(`Queue ${name} already exists`); } const wrapper = new RestateQueueWrapper<T>( name, this.client, this.adminClient, opts, ); this.queues.set(name, wrapper); return wrapper; } createRunner<T>( queue: Queue<T>, funcs: RunnerFuncs<T>, opts: RunnerOptions<T>, ): Runner<T> { const name = queue.name(); let wrapper = this.services.get(name); if (wrapper) { throw new Error(`Queue ${name} already exists`); } const svc = new RestateRunnerWrapper<T>( buildRestateService(queue, funcs, opts, queue.opts), ); this.services.set(name, svc); return svc; } async shutdown(): Promise<void> { // No-op for sqlite } } export class RestateQueueProvider implements PluginProvider<QueueClient> { private client: QueueClient | null = null; static isConfigured(): boolean { return envConfig.RESTATE_LISTEN_PORT !== undefined; } async getClient(): Promise<QueueClient | null> { if (!this.client) { const client = new RestateQueueClient(); this.client = client; } return this.client; } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/karakeep-app/karakeep'

If you have feedback or need assistance with the MCP directory API, please join our Discord server