Skip to main content
Glama

Convex MCP server

Official
by get-convex
executor.ts24.9 kB
import { Console } from "node:console"; import { Writable } from "node:stream"; import { performance } from "node:perf_hooks"; import fs from "node:fs"; import path from "node:path"; import { createHash } from "node:crypto"; import { inspect } from "node:util"; import { createRequire } from "node:module"; import { pathToFileURL } from "node:url"; import { UserIdentity } from "convex/server"; import { CanonicalizedModulePath, FunctionName, isConvexAction, UdfPath, EnvironmentVariable, } from "./convex"; import { FrameData, extractErrorMessage, registerPrepareStackTrace, } from "./errors"; import { findLineNumbers } from "./analyze"; import { Syscalls, SyscallsImpl } from "./syscalls"; import { SourcePackage, maybeDownloadAndLinkPackages } from "./source_package"; import { buildDeps, BuildDepsRequest } from "./build_deps"; import { ConvexError, JSONValue } from "convex/values"; import { log, logDebug, logDurationMs } from "./log"; import { AsyncLocalStorage } from "node:async_hooks"; // Small hack to detect if we're running in the dynamic or static lambda. const AWS_LAMBDA_EXECUTOR_TYPE = ( process.env.AWS_LAMBDA_FUNCTION_NAME ?? "" ).endsWith("-d") ? "DYNAMIC" : "STATIC"; const AWS_LAMBDA_FUNCTION_MEMORY_SIZE = parseInt( process.env.AWS_LAMBDA_FUNCTION_MEMORY_SIZE ?? "512", 10, ); // We allocate extra memory (4GB) for dynamic lambdas, but don't want to charge customers for this implementation detail. // We report 512MB (which may be an undercharge) but this should only be a temporary state while the static lambda deploys. const AWS_LAMBDA_BILLED_MEMORY_SIZE = AWS_LAMBDA_EXECUTOR_TYPE === "DYNAMIC" ? 512 : AWS_LAMBDA_FUNCTION_MEMORY_SIZE; // When we bundle commonJS modules as ESM with esbuild, the bundled code might still use // `require`, exports, module, __dirname or __filename despite being in ESM. // // We inject these into the environment to make this code work. When actual CJS modules // are invoked, Node overrides these globals with the correct globals for that module, so // we don't need to worry about these globals messing with CJS runtime globals. // See https://nodejs.org/api/modules.html#the-module-wrapper for details on how Node does this. export function setupGlobals(modulePath: string) { // Set `require` to use a module resolution algorithm relative to the module, // instead of this executor package, so external deps can be used. globalThis.require = createRequire(modulePath); globalThis.exports = exports; globalThis.module = module; // TODO(presley): Currently, __filename and __dirname are /var/task/aws_lambda.cjs // and /var/task respectively. Once we use `npm install` to install node_modules // instead of bundling them, we can explore dropping those or making them accurate. globalThis.__dirname = __dirname; globalThis.__filename = __filename; } let numInvocations = 0; export function setEnvironmentVariables(envs: EnvironmentVariable[]) { // AWS Lambda populates a number of environment variables, like Lambda version, // handler name, session, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, etc. We // don't want to expose any of that. Only expose variables that are common // between local Node.js and AWS Lambda. const allowedEnvs = ["PATH", "PWD", "LANG", "NODE_PATH", "TZ", "UTC"]; const sanitized: { [name: string]: string } = {}; for (const name of allowedEnvs) { const value = process.env[name]; if (value !== undefined) { sanitized[name] = value; } } process.env = sanitized; // Set the user defined environment variables envs.sort((a, b) => a.name.localeCompare(b.name)); for (const e of envs) { process.env[e.name] = e.value; } // Compute a hash based on the user defined environment variables. return createHash("md5").update(JSON.stringify(envs)).digest("hex"); } function unhandledRejectionHandler( responseStream: Writable, event: "unhandledRejection" | "uncaughtException", e: unknown, ) { // Respond with a user error. log(`handling ${event}`); const response = { type: "error", message: `${event}: ${extractErrorMessage(e)}`, frames: [], syscallTrace: (globalSyscalls.getStore() as SyscallsImpl | null) ?.syscallTrace, memoryAllocatedMb: AWS_LAMBDA_FUNCTION_MEMORY_SIZE, }; if (e instanceof Error) { e.stack; // calls overridden prepareStackTrace if ((e as any).__frameData) { response.frames = JSON.parse((e as any).__frameData); } } const json = JSON.stringify(response); log(json); // Use `.end()` to make sure that no other finish message makes it into the // stream, then exit the process to prevent any ongoing async work from // leaking into the next invocation. responseStream.on("finish", () => process.exit(1)); responseStream.end(json); } export async function invoke( request: ExecuteRequest | AnalyzeRequest | BuildDepsRequest, responseStream: Writable, ): Promise<number> { process.removeAllListeners("unhandledRejection"); process.removeAllListeners("uncaughtException"); process.on("unhandledRejection", (e: unknown) => unhandledRejectionHandler(responseStream, "unhandledRejection", e), ); process.on("uncaughtException", (e: unknown) => unhandledRejectionHandler(responseStream, "uncaughtException", e), ); const start = performance.now(); numInvocations += 1; logDebug(`Environment numInvocations=${numInvocations}`); const result = await globalConsoleState.run( defaultConsoleState(), async () => { const devConsole = setupConsole(responseStream); return await globalDevConsole.run(devConsole, async () => { let result; if (request.type === "execute") { result = await execute(request); } else if (request.type === "analyze") { result = await analyze(request); } else if (request.type === "build_deps") { result = await buildDeps(request); } else { throw new Error(`Unknown request type ${request}`); } return result; }); }, ); logDurationMs("Total invocation time", start); logDebug(`Memory allocated: ${AWS_LAMBDA_FUNCTION_MEMORY_SIZE}MB`); if ( AWS_LAMBDA_EXECUTOR_TYPE === "DYNAMIC" && AWS_LAMBDA_BILLED_MEMORY_SIZE !== AWS_LAMBDA_FUNCTION_MEMORY_SIZE ) { logDebug( `Dynamic executor used: reporting ${AWS_LAMBDA_BILLED_MEMORY_SIZE}MB of billed memory`, ); } responseStream.write(JSON.stringify(result)); return numInvocations; } export type ExecuteRequest = { type: "execute"; // The AWS lambda request id unique to this particular UDF. Unlike the ID in the ExecutionContext, // it's unique to this particular request and never re-used. // TODO(CX-5733): Rename this in callers and migrate. requestId: string; sourcePackage: SourcePackage; udfPath: UdfPath; args: string; backendAddress: string; backendCallbackToken: string; authHeader: string | null; userIdentity: UserIdentity | null; environmentVariables: EnvironmentVariable[]; timeoutSecs: number; npmVersion: string | null; executionContext: ExecutionContext; encodedParentTrace: string | null; }; export type ExecutionContext = { requestId: string; executionId: string | undefined; isRoot: boolean | undefined; parentScheduledJob: string | null; parentScheduledJobComponentId: string | null; }; export type ExecuteResponseInner = | { type: "success"; udfReturn: string; logLines: string[]; udfTimeMs: number; importTimeMs: number; } | { type: "error"; message: string; name: string; data?: string; frames?: FrameData[]; logLines: string[]; udfTimeMs?: number; importTimeMs?: number; }; export type SyscallStats = { invocations: number; errors: number; totalDurationMs: number; }; export type ExecuteResponse = ExecuteResponseInner & { // The number of invocations in the lifetime of executor environment. 1 implies // this is the first request in that environment. numInvocations: number; // Time spent downloading the package in seconds. downloadTimeMs?: number; // Time spent compiling the package in seconds. importTimeMs?: number; // Total time spent in the executor totalExecutorTimeMs: number; syscallTrace: Record<string, SyscallStats>; // The amount of memory allocated to the executor environment. This is constant for the lifetime of the environment. memoryAllocatedMb: number; }; export async function execute( request: ExecuteRequest, ): Promise<ExecuteResponse> { const start = performance.now(); // Download missing packages and do any necessary linking const local = await maybeDownloadAndLinkPackages(request.sourcePackage); const downloadTimeMs = logDurationMs("downloadTime", start); const syscalls = new SyscallsImpl( request.udfPath, request.requestId, request.backendAddress, request.backendCallbackToken, request.authHeader, request.userIdentity, request.executionContext, request.encodedParentTrace, ); let innerResult: ExecuteResponseInner; try { if (!local.modules.has(request.udfPath.canonicalizedPath)) { throw new Error( `Couldn't find module source for ${request.udfPath.canonicalizedPath}`, ); } innerResult = await executeInner( request.requestId, local.dir, request.udfPath.canonicalizedPath, request.udfPath.function ?? "default", request.args, request.environmentVariables, request.timeoutSecs, syscalls, ); } catch (e: any) { innerResult = { type: "error", message: extractErrorMessage(e), name: e.name, // Log lines should be streamed, but send an empty array for backwards compatibility logLines: [], }; } const totalExecutorTimeMs = logDurationMs("totalExecutorTime", start); return { ...innerResult, numInvocations, downloadTimeMs, totalExecutorTimeMs, syscallTrace: syscalls.syscallTrace, memoryAllocatedMb: AWS_LAMBDA_BILLED_MEMORY_SIZE, }; } export async function executeInner( lambdaExecuteId: string, dir: string, relPath: string, name: FunctionName, args: string, environmentVariables: EnvironmentVariable[], timeoutSecs: number, syscalls: Syscalls, ): Promise<ExecuteResponseInner> { logDebug(`Executing ${relPath}:${name} from ${dir}`); const modulesDir = path.join(dir, "modules"); registerPrepareStackTrace(modulesDir); const start = performance.now(); // We have to reevaluate the module if the envs change since they can be used // in global scope. We add them as query argument to achieve this behavior. const envHash = setEnvironmentVariables(environmentVariables); setupGlobals(`${modulesDir}/${relPath}`); const module = await import( path.join(modulesDir, `${relPath}?envHash=${envHash}`) ); const importTimeMs = logDurationMs("importTimeMs", start); const userFunction = module[name]; if (!userFunction) { throw new Error(`Couldn't find action \`${name}\` in \`${relPath}\``); } if (!isConvexAction(userFunction)) { throw new Error( `\`${name}\` wasn't registered as a Convex action in \`${relPath}\``, ); } const invoke = userFunction.invokeAction; const startExecute = performance.now(); // Use this symbol to determine if the result of the Promise.race // was a timeout or not. const timeoutError = Symbol(); let udfReturn: string | symbol; try { let timer: NodeJS.Timeout | null = null; const timeout = new Promise<symbol>((res) => { timer = setTimeout(() => res(timeoutError), timeoutSecs * 1000); }); udfReturn = await globalSyscalls.run(syscalls, () => { return Promise.race<string | symbol>([ invoke(lambdaExecuteId, args), timeout, ]).finally(() => { // Always clear the timeout after the promise is settled. // There shouldn't be a race because the timeout promise is created first. // But it's also fine because with Promise.race the timeout promise should be swallowed timer && clearTimeout(timer); }); }); } catch (e: any) { // Accessing `e.stack` is important! Without it e.__frameData // is not generated! e?.stack; const udfTimeMs = logDurationMs("executeUdf", startExecute); return { type: "error", message: e?.message ?? "", name: e?.name ?? "", data: getConvexErrorData(e), frames: e?.__frameData ? JSON.parse(e.__frameData) : [], // Log lines should be streamed, but send an empty array for backwards compatibility logLines: [], udfTimeMs, importTimeMs, }; } if (udfReturn === timeoutError) { throw new Error( `Action \`${name}\` execution timed out (maximum duration ${timeoutSecs}s)`, ); } if (typeof udfReturn !== "string") { throw new Error( // Need to cast to a string here to make TS happy. `Action \`${name}\` did not return a string (returned \`${String( udfReturn, )}\`)`, ); } syscalls.assertNoPendingSyscalls(); const udfTimeMs = logDurationMs("executeUdf", startExecute); return { type: "success", udfReturn, // Log lines should be streamed, but send an empty array for backwards compatibility logLines: [], udfTimeMs, importTimeMs, }; } // Keep in sync with registration_impl function getConvexErrorData(thrown: unknown) { if ( typeof thrown === "object" && thrown !== null && Symbol.for("ConvexError") in thrown ) { // At this point data has already been serialized // in `invokeAction`. return (thrown as ConvexError<string>).data; } return undefined; } export type AnalyzeRequest = { type: "analyze"; // The AWS lambda request id unique to this particular request. // TODO(CX-5733): Rename this in callers and migrate. requestId: string; sourcePackage: SourcePackage; environmentVariables: EnvironmentVariable[]; }; export type AnalyzeResponse = | { type: "success"; modules: Record<CanonicalizedModulePath, AnalyzedFunctions>; } | { type: "error"; message: string; frames?: FrameData[]; }; export async function analyze( request: AnalyzeRequest, ): Promise<AnalyzeResponse> { setEnvironmentVariables(request.environmentVariables); const local = await maybeDownloadAndLinkPackages(request.sourcePackage); const modulesDir = path.join(local.dir, "modules"); registerPrepareStackTrace(modulesDir); const modules: Record<CanonicalizedModulePath, AnalyzedFunctions> = {}; for (const modulePath of local.modules) { try { const filePath = path.join(modulesDir, modulePath); modules[modulePath] = await analyzeModule(filePath); } catch (e: any) { e.stack; return { type: "error", message: `Failed to analyze ${modulePath}: ${extractErrorMessage(e)}`, frames: e.__frameData ? JSON.parse(e.__frameData) : [], }; } } return { type: "success", modules }; } type Visibility = { kind: "public" } | { kind: "internal" }; type UdfType = "action" | "mutation" | "query" | "httpAction"; export type AnalyzedFunctions = Array<{ name: string; lineno: number; udfType: UdfType; visibility: Visibility | null; args: JSONValue | null; returns: JSONValue | null; }>; async function analyzeModule(filePath: string): Promise<AnalyzedFunctions> { setupGlobals(filePath); const fileUrl = pathToFileURL(filePath).href; const module = await import(fileUrl); const functions: Map< string, { udfType: UdfType; visibility: Visibility | null; args: JSONValue | null; returns: JSONValue | null; } > = new Map(); for (const [name, value] of Object.entries(module)) { if (value === undefined || value === null) { continue; } // TODO: This is a little more permissive than our V8 layer in that we // don't check whether `value instanceof Function`. This is tricky here // since we need to use the context's `Function` for the prototype check. let udfType: UdfType; if ( Object.prototype.hasOwnProperty.call(value, "isAction") && Object.prototype.hasOwnProperty.call(value, "invokeAction") ) { udfType = "action"; } else if ( Object.prototype.hasOwnProperty.call(value, "isQuery") && Object.prototype.hasOwnProperty.call(value, "invokeQuery") ) { udfType = "query"; } else if ( Object.prototype.hasOwnProperty.call(value, "isMutation") && Object.prototype.hasOwnProperty.call(value, "invokeMutation") ) { udfType = "mutation"; } else if ( Object.prototype.hasOwnProperty.call(value, "isHttp") && (Object.prototype.hasOwnProperty.call(value, "invokeHttpEndpoint") || Object.prototype.hasOwnProperty.call(value, "invokeHttpAction")) ) { udfType = "httpAction"; } else { continue; } const isPublic = Object.prototype.hasOwnProperty.call(value, "isPublic"); const isInternal = Object.prototype.hasOwnProperty.call( value, "isInternal", ); let args: string | null = null; if ( Object.prototype.hasOwnProperty.call(value, "exportArgs") && typeof (value as any).exportArgs === "function" ) { const exportedArgs = (value as any).exportArgs(); if (typeof exportedArgs === "string") { args = JSON.parse(exportedArgs); } } let returns: string | null = null; if ( Object.prototype.hasOwnProperty.call(value, "exportReturns") && typeof (value as any).exportReturns === "function" ) { const exportedReturns = (value as any).exportReturns(); if (typeof exportedReturns === "string") { returns = JSON.parse(exportedReturns); } } if (isPublic && isInternal) { logDebug(`Skipping function marked as both public and internal: ${name}`); continue; } else if (isPublic) { functions.set(name, { udfType, visibility: { kind: "public" }, args, returns, }); } else if (isInternal) { functions.set(name, { udfType, visibility: { kind: "internal" }, args, returns, }); } else { functions.set(name, { udfType, visibility: null, args, returns }); } } // Do an awful, regex based line match that assumes that moduleConfig.source originates from // esbuild since we don't have V8's `Function::get_script_line_number` in Node. This was // how we did this in `isolate/` before #991. const source = fs.readFileSync(filePath, { encoding: "utf-8", }); const lineNumbers = findLineNumbers(source, Array.from(functions.keys())); const analyzed = [...functions.entries()].map(([name, properties]) => { // Finding line numbers is best effort. We should return the analyzed // function even if we fail to find the exact line number. const lineno = lineNumbers.get(name) ?? 0; return { name, lineno, ...properties, }; }); return analyzed; } const globalSyscalls = new AsyncLocalStorage<Syscalls>(); export const globalConsoleState = new AsyncLocalStorage<ConsoleState>(); export const globalDevConsole = new AsyncLocalStorage<Console>(); (globalThis as any).Convex = { syscall: (op: string, jsonArgs: string) => { const syscalls = globalSyscalls.getStore(); if (!syscalls) { throw new Error(`Cannot invoke syscall during module imports`); } return syscalls.syscall(op, jsonArgs); }, asyncSyscall: (op: string, jsonArgs: string) => { const syscalls = globalSyscalls.getStore(); if (!syscalls) { throw new Error(`Cannot invoke syscall during module imports`); } return syscalls.asyncSyscall(op, jsonArgs); }, jsSyscall: (op: string, args: Record<string, any>) => { const syscalls = globalSyscalls.getStore(); if (!syscalls) { throw new Error(`Cannot invoke syscall during module imports`); } return syscalls.asyncJsSyscall(op, args); }, }; export const ogConsole = globalThis.console; Object.defineProperty(globalThis, "console", { get() { return globalDevConsole.getStore()!; }, }); function toString(value: unknown, defaultValue: string) { return value === undefined ? defaultValue : value === null ? "null" : value.toString(); } type ConsoleState = { sentLines: number; totalSentLineLength: number; logLimitHit: boolean; timers: Map<string, number>; }; export function defaultConsoleState(): ConsoleState { return { sentLines: 0, totalSentLineLength: 0, logLimitHit: false, timers: new Map(), }; } export function setupConsole(responseStream: Writable) { // TODO(presley): For some reason capturing stdout and stderr doesn't work in // AWS Lambda. Not sure if it is async issue or AWS does something weird where // they patch node:console Console object. For now we will will throw away the // stdout and stderr and override a few methods directly. const lineBuffer = new Writable({ construct: (callback) => { callback(); }, write: (chunk, encoding, callback) => { //this.consoleBuffer.push(chunk.toString()); callback(); }, }); const devConsole = new Console({ stdout: lineBuffer, stderr: lineBuffer, }); // TODO: This code is copy & pasted from setup.ts in v8. We should // probably unify it at some points. const consoleState = globalConsoleState.getStore()!; function consoleMessage(level: string, ...args: any[]) { // TODO: Support string substitution. // TODO: Implement the rest of the Console API. const serializedArgs = args.map((e: any) => inspect(e, { // Our entire log line can't be more than 32768 bytes (MAX_LOG_LINE_LENGTH) so // keep string in here to no more than 32768 UTF-16 code units, and let // the backend truncate the whole log line if it is too long. maxStringLength: 32768, customInspect: true, }), ); let messages = serializedArgs; // Requirements: // - 6MB limit on AWS lambda response size, so only collect // maximum 2MB of logs, one ~million UTF16 code units (UTF16 // code unit is 2 bytes). // - we only allow max 256 logs, see MAX_LOG_LINES if (consoleState.logLimitHit === true) { return; } const totalMessageLength = messages.reduce((acc, current) => acc + current.length + 1, 0) - 1; if (consoleState.totalSentLineLength + totalMessageLength > 1_048_576) { level = "ERROR"; messages = [ "Log overflow (maximum 1M characters). Remaining log lines omitted.", ]; consoleState.logLimitHit = true; } else if (consoleState.sentLines >= 256) { level = "ERROR"; messages = ["Log overflow (maximum 256). Remaining log lines omitted."]; consoleState.logLimitHit = true; } responseStream.write( JSON.stringify({ kind: "LogLine", data: { messages, isTruncated: false, timestamp: Date.now(), level, }, }) + "\n", ); consoleState.totalSentLineLength += totalMessageLength; consoleState.sentLines += 1; } devConsole.debug = function (...args) { consoleMessage("DEBUG", ...args); }; devConsole.error = function (...args) { consoleMessage("ERROR", ...args); }; devConsole.info = function (...args) { consoleMessage("INFO", ...args); }; devConsole.log = function (...args) { consoleMessage("LOG", ...args); }; devConsole.warn = function (...args) { consoleMessage("WARN", ...args); }; devConsole.time = function (label: unknown) { const labelStr = toString(label, "default"); if (consoleState.timers.has(labelStr)) { consoleMessage("WARN", `Timer '${labelStr}' already exists`); } else { consoleState.timers.set(labelStr, Date.now()); } }; devConsole.timeLog = function (label: unknown, ...args: any[]) { const labelStr = toString(label, "default"); const time = consoleState.timers.get(labelStr); if (time === undefined) { consoleMessage("WARN", `Timer '${labelStr}' does not exist`); } else { const duration = Date.now() - time; consoleMessage("INFO", `${labelStr}: ${duration}ms`, ...args); } }; devConsole.timeEnd = function (label: unknown) { const labelStr = toString(label, "default"); const time = consoleState.timers.get(labelStr); if (time === undefined) { consoleMessage("WARN", `Timer '${labelStr}' does not exist`); } else { const duration = Date.now() - time; consoleState.timers.delete(labelStr); consoleMessage("INFO", `${labelStr}: ${duration}ms`); } }; return devConsole; }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/get-convex/convex-backend'

If you have feedback or need assistance with the MCP directory API, please join our Discord server