aggregate
Execute aggregation pipelines on MongoDB collections to transform, filter, and analyze data through multi-stage processing.
Instructions
Run an aggregation against a MongoDB collection
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| database | Yes | Database name | |
| collection | Yes | Collection name | |
| pipeline | Yes | An array of aggregation stages to execute. | |
| responseBytesLimit | No | The maximum number of bytes to return in the response. This value is capped by the server's configured maxBytesPerQuery and cannot be exceeded. Note to LLM: If the entire aggregation result is required, use the "export" tool instead of increasing this limit. |
Implementation Reference
- The AggregateTool class provides the core handler logic for the 'aggregate' tool, including validation, index checks, embedding generation if needed, execution of the aggregation pipeline with limits, and formatting of results.export class AggregateTool extends MongoDBToolBase { public name = "aggregate"; protected description = "Run an aggregation against a MongoDB collection"; protected argsShape = { ...DbOperationArgs, ...getAggregateArgs(this.isFeatureEnabled("search")), }; static operationType: OperationType = "read"; protected async execute( { database, collection, pipeline, responseBytesLimit }: ToolArgs<typeof this.argsShape>, { signal }: ToolExecutionContext ): Promise<CallToolResult> { let aggregationCursor: AggregationCursor | undefined = undefined; try { const provider = await this.ensureConnected(); await this.assertOnlyUsesPermittedStages(pipeline); if (await this.session.isSearchSupported()) { assertVectorSearchFilterFieldsAreIndexed({ searchIndexes: (await provider.getSearchIndexes(database, collection)) as SearchIndex[], pipeline, logger: this.session.logger, }); } // Check if aggregate operation uses an index if enabled if (this.config.indexCheck) { const [usesVectorSearchIndex, indexName] = await this.isVectorSearchIndexUsed({ database, collection, pipeline, }); switch (usesVectorSearchIndex) { case "not-vector-search-query": await checkIndexUsage(provider, database, collection, "aggregate", async () => { return provider .aggregate(database, collection, pipeline, {}, { writeConcern: undefined }) .explain("queryPlanner"); }); break; case "non-existent-index": throw new MongoDBError( ErrorCodes.AtlasVectorSearchIndexNotFound, `Could not find an index with name "${indexName}" in namespace "${database}.${collection}".` ); case "valid-index": // nothing to do, everything is correct so ready to run the query } } pipeline = await this.replaceRawValuesWithEmbeddingsIfNecessary({ database, collection, pipeline, }); const cappedResultsPipeline = [...pipeline]; if (this.config.maxDocumentsPerQuery > 0) { cappedResultsPipeline.push({ $limit: this.config.maxDocumentsPerQuery }); } aggregationCursor = provider.aggregate(database, collection, cappedResultsPipeline); const [totalDocuments, cursorResults] = await Promise.all([ this.countAggregationResultDocuments({ provider, database, collection, pipeline }), collectCursorUntilMaxBytesLimit({ cursor: aggregationCursor, configuredMaxBytesPerQuery: this.config.maxBytesPerQuery, toolResponseBytesLimit: responseBytesLimit, abortSignal: signal, }), ]); // If the total number of documents that the aggregation would've // resulted in would be greater than the configured // maxDocumentsPerQuery then we know for sure that the results were // capped. const aggregationResultsCappedByMaxDocumentsLimit = this.config.maxDocumentsPerQuery > 0 && !!totalDocuments && totalDocuments > this.config.maxDocumentsPerQuery; return { content: formatUntrustedData( this.generateMessage({ aggResultsCount: totalDocuments, documents: cursorResults.documents, appliedLimits: [ aggregationResultsCappedByMaxDocumentsLimit ? "config.maxDocumentsPerQuery" : undefined, cursorResults.cappedBy, ].filter((limit): limit is keyof typeof CURSOR_LIMITS_TO_LLM_TEXT => !!limit), }), ...(cursorResults.documents.length > 0 ? [EJSON.stringify(cursorResults.documents)] : []) ), }; } finally { if (aggregationCursor) { void this.safeCloseCursor(aggregationCursor); } } } private async safeCloseCursor(cursor: AggregationCursor<unknown>): Promise<void> { try { await cursor.close(); } catch (error) { this.session.logger.warning({ id: LogId.mongodbCursorCloseError, context: "aggregate tool", message: `Error when closing the cursor - ${error instanceof Error ? error.message : String(error)}`, }); } } private async assertOnlyUsesPermittedStages(pipeline: Record<string, unknown>[]): Promise<void> { const writeOperations: OperationType[] = ["update", "create", "delete"]; const isSearchSupported = await this.session.isSearchSupported(); let writeStageForbiddenError = ""; if (this.config.readOnly) { writeStageForbiddenError = "In readOnly mode you can not run pipelines with $out or $merge stages."; } else if (this.config.disabledTools.some((t) => writeOperations.includes(t as OperationType))) { writeStageForbiddenError = "When 'create', 'update', or 'delete' operations are disabled, you can not run pipelines with $out or $merge stages."; } for (const stage of pipeline) { // This validates that in readOnly mode or "write" operations are disabled, we can't use $out or $merge. // This is really important because aggregates are the only "multi-faceted" tool in the MQL, where you // can both read and write. if ((stage.$out || stage.$merge) && writeStageForbiddenError) { throw new MongoDBError(ErrorCodes.ForbiddenWriteOperation, writeStageForbiddenError); } // This ensure that you can't use $vectorSearch if the cluster does not support MongoDB Search // either in Atlas or in a local cluster. if (stage.$vectorSearch && !isSearchSupported) { throw new MongoDBError( ErrorCodes.AtlasSearchNotSupported, "Atlas Search is not supported in this cluster." ); } } } private async countAggregationResultDocuments({ provider, database, collection, pipeline, }: { provider: NodeDriverServiceProvider; database: string; collection: string; pipeline: Document[]; }): Promise<number | undefined> { const resultsCountAggregation = [...pipeline, { $count: "totalDocuments" }]; return await operationWithFallback(async (): Promise<number | undefined> => { const aggregationResults = await provider .aggregate(database, collection, resultsCountAggregation) .maxTimeMS(AGG_COUNT_MAX_TIME_MS_CAP) .toArray(); const documentWithCount: unknown = aggregationResults.length === 1 ? aggregationResults[0] : undefined; const totalDocuments = documentWithCount && typeof documentWithCount === "object" && "totalDocuments" in documentWithCount && typeof documentWithCount.totalDocuments === "number" ? documentWithCount.totalDocuments : 0; return totalDocuments; }, undefined); } private async replaceRawValuesWithEmbeddingsIfNecessary({ database, collection, pipeline, }: { database: string; collection: string; pipeline: Document[]; }): Promise<Document[]> { for (const stage of pipeline) { if ("$vectorSearch" in stage) { const { $vectorSearch: vectorSearchStage } = stage as z.infer<typeof VectorSearchStage>; if (Array.isArray(vectorSearchStage.queryVector)) { continue; } if (!vectorSearchStage.embeddingParameters) { throw new MongoDBError( ErrorCodes.AtlasVectorSearchInvalidQuery, "embeddingModel is mandatory if queryVector is a raw string." ); } const embeddingParameters = vectorSearchStage.embeddingParameters; delete vectorSearchStage.embeddingParameters; await this.session.vectorSearchEmbeddingsManager.assertVectorSearchIndexExists({ database, collection, path: vectorSearchStage.path, }); const [embeddings] = await this.session.vectorSearchEmbeddingsManager.generateEmbeddings({ rawValues: [vectorSearchStage.queryVector], embeddingParameters, inputType: "query", }); if (!embeddings) { throw new MongoDBError( ErrorCodes.AtlasVectorSearchInvalidQuery, "Failed to generate embeddings for the query vector." ); } // $vectorSearch.queryVector can be a BSON.Binary: that it's not either number or an array. // It's not exactly valid from the LLM perspective (they can't provide binaries). // That's why we overwrite the stage in an untyped way, as what we expose and what LLMs can use is different. vectorSearchStage.queryVector = embeddings as string | number[]; } } await this.session.vectorSearchEmbeddingsManager.assertFieldsHaveCorrectEmbeddings( { database, collection }, pipeline ); return pipeline; } private async isVectorSearchIndexUsed({ database, collection, pipeline, }: { database: string; collection: string; pipeline: Document[]; }): Promise<["valid-index" | "non-existent-index" | "not-vector-search-query", string?]> { // check if the pipeline contains a $vectorSearch stage let usesVectorSearch = false; let indexName: string = "default"; for (const stage of pipeline) { if ("$vectorSearch" in stage) { const { $vectorSearch: vectorSearchStage } = stage as z.infer<typeof VectorSearchStage>; usesVectorSearch = true; indexName = vectorSearchStage.index; break; } } if (!usesVectorSearch) { return ["not-vector-search-query"]; } const indexExists = await this.session.vectorSearchEmbeddingsManager.indexExists({ database, collection, indexName, }); return [indexExists ? "valid-index" : "non-existent-index", indexName]; } private generateMessage({ aggResultsCount, documents, appliedLimits, }: { aggResultsCount: number | undefined; documents: unknown[]; appliedLimits: (keyof typeof CURSOR_LIMITS_TO_LLM_TEXT)[]; }): string { const appliedLimitText = appliedLimits.length ? `\ while respecting the applied limits of ${appliedLimits.map((limit) => CURSOR_LIMITS_TO_LLM_TEXT[limit]).join(", ")}. \ Note to LLM: If the entire query result is required then use "export" tool to export the query results.\ ` : ""; return `\ The aggregation resulted in ${aggResultsCount === undefined ? "indeterminable number of" : aggResultsCount} documents. \ Returning ${documents.length} documents${appliedLimitText ? ` ${appliedLimitText}` : "."}\ `; } protected resolveTelemetryMetadata( args: ToolArgs<typeof this.argsShape>, { result }: { result: CallToolResult } ): ConnectionMetadata | AutoEmbeddingsUsageMetadata { const [maybeVectorStage] = args.pipeline; if ( maybeVectorStage !== null && maybeVectorStage instanceof Object && "$vectorSearch" in maybeVectorStage && "embeddingParameters" in maybeVectorStage["$vectorSearch"] && this.config.voyageApiKey ) { return { ...super.resolveTelemetryMetadata(args, { result }), embeddingsGeneratedBy: "mcp", }; } else { return super.resolveTelemetryMetadata(args, { result }); } } }
- Defines the input schema (argsShape) for the aggregate tool, including the pipeline array with support for vectorSearch stages and optional responseBytesLimit.export const getAggregateArgs = (vectorSearchEnabled: boolean) => ({ pipeline: z .array(vectorSearchEnabled ? z.union([VectorSearchStage, AnyAggregateStage]) : AnyAggregateStage) .describe(vectorSearchEnabled ? pipelineDescriptionWithVectorSearch : genericPipelineDescription), responseBytesLimit: z.number().optional().default(ONE_MB).describe(`\ The maximum number of bytes to return in the response. This value is capped by the server's configured maxBytesPerQuery and cannot be exceeded. \ Note to LLM: If the entire aggregation result is required, use the "export" tool instead of increasing this limit.\ `), }) as const;
- Zod schemas for aggregation stages, including AnyAggregateStage and VectorSearchStage used in the aggregate tool's pipeline validation.export const AnyAggregateStage = zEJSON(); export const VectorSearchStage = z.object({ $vectorSearch: z .object({ exact: z .boolean() .optional() .default(false) .describe( "When true, uses an ENN algorithm, otherwise uses ANN. Using ENN is not compatible with numCandidates, in that case, numCandidates must be left empty." ), index: z.string().describe("Name of the index, as retrieved from the `collection-indexes` tool."), path: z .string() .describe( "Field, in dot notation, where to search. There must be a vector search index for that field. Note to LLM: When unsure, use the 'collection-indexes' tool to validate that the field is indexed with a vector search index." ), queryVector: z .union([z.string(), z.array(z.number())]) .describe( "The content to search for. The embeddingParameters field is mandatory if the queryVector is a string, in that case, the tool generates the embedding automatically using the provided configuration." ), numCandidates: z .number() .int() .positive() .optional() .describe("Number of candidates for the ANN algorithm. Mandatory when exact is false."), limit: z.number().int().positive().optional().default(10), filter: zEJSON() .optional() .describe( "MQL filter that can only use filter fields from the index definition. Note to LLM: If unsure, use the `collection-indexes` tool to learn which fields can be used for filtering." ), embeddingParameters: zSupportedEmbeddingParameters .optional() .describe( "The embedding model and its parameters to use to generate embeddings before searching. It is mandatory if queryVector is a string value. Note to LLM: If unsure, ask the user before providing one." ), }) .passthrough(), });
- src/tools/mongodb/tools.ts:13-13 (registration)Re-exports the AggregateTool for inclusion in the MongoDB tools module.export { AggregateTool } from "./read/aggregate.js";
- src/tools/index.ts:7-11 (registration)Defines AllTools array which includes all MongoDB tools (via MongoDbTools), used by the server to register tools including AggregateTool.export const AllTools: ToolClass[] = Object.values({ ...MongoDbTools, ...AtlasTools, ...AtlasLocalTools, });