aggregate
Perform database aggregation operations on MongoDB collections by executing a pipeline of stages, enabling data transformation and analysis.
Instructions
Run an aggregation against a MongoDB collection
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| collection | Yes | Collection name | |
| database | Yes | Database name | |
| pipeline | Yes | An array of aggregation stages to execute |
Input Schema (JSON Schema)
{
"$schema": "http://json-schema.org/draft-07/schema#",
"additionalProperties": false,
"properties": {
"collection": {
"description": "Collection name",
"type": "string"
},
"database": {
"description": "Database name",
"type": "string"
},
"pipeline": {
"description": "An array of aggregation stages to execute",
"items": {
"additionalProperties": true,
"properties": {},
"type": "object"
},
"type": "array"
}
},
"required": [
"database",
"collection",
"pipeline"
],
"type": "object"
}
Implementation Reference
- The AggregateTool class implements the core logic for the 'aggregate' tool, executing aggregation pipelines on MongoDB collections with support for vector search, limits, and safety checks.export class AggregateTool extends MongoDBToolBase { public name = "aggregate"; protected description = "Run an aggregation against a MongoDB collection"; protected argsShape = { ...DbOperationArgs, ...getAggregateArgs(this.isFeatureEnabled("search")), }; static operationType: OperationType = "read"; protected async execute( { database, collection, pipeline, responseBytesLimit }: ToolArgs<typeof this.argsShape>, { signal }: ToolExecutionContext ): Promise<CallToolResult> { let aggregationCursor: AggregationCursor | undefined = undefined; try { const provider = await this.ensureConnected(); await this.assertOnlyUsesPermittedStages(pipeline); if (await this.session.isSearchSupported()) { assertVectorSearchFilterFieldsAreIndexed({ searchIndexes: (await provider.getSearchIndexes(database, collection)) as SearchIndex[], pipeline, logger: this.session.logger, }); } // Check if aggregate operation uses an index if enabled if (this.config.indexCheck) { const [usesVectorSearchIndex, indexName] = await this.isVectorSearchIndexUsed({ database, collection, pipeline, }); switch (usesVectorSearchIndex) { case "not-vector-search-query": await checkIndexUsage(provider, database, collection, "aggregate", async () => { return provider .aggregate(database, collection, pipeline, {}, { writeConcern: undefined }) .explain("queryPlanner"); }); break; case "non-existent-index": throw new MongoDBError( ErrorCodes.AtlasVectorSearchIndexNotFound, `Could not find an index with name "${indexName}" in namespace "${database}.${collection}".` ); case "valid-index": // nothing to do, everything is correct so ready to run the query } } pipeline = await this.replaceRawValuesWithEmbeddingsIfNecessary({ database, collection, pipeline, }); const cappedResultsPipeline = [...pipeline]; if (this.config.maxDocumentsPerQuery > 0) { cappedResultsPipeline.push({ $limit: this.config.maxDocumentsPerQuery }); } aggregationCursor = provider.aggregate(database, collection, cappedResultsPipeline); const [totalDocuments, cursorResults] = await Promise.all([ this.countAggregationResultDocuments({ provider, database, collection, pipeline }), collectCursorUntilMaxBytesLimit({ cursor: aggregationCursor, configuredMaxBytesPerQuery: this.config.maxBytesPerQuery, toolResponseBytesLimit: responseBytesLimit, abortSignal: signal, }), ]); // If the total number of documents that the aggregation would've // resulted in would be greater than the configured // maxDocumentsPerQuery then we know for sure that the results were // capped. const aggregationResultsCappedByMaxDocumentsLimit = this.config.maxDocumentsPerQuery > 0 && !!totalDocuments && totalDocuments > this.config.maxDocumentsPerQuery; return { content: formatUntrustedData( this.generateMessage({ aggResultsCount: totalDocuments, documents: cursorResults.documents, appliedLimits: [ aggregationResultsCappedByMaxDocumentsLimit ? "config.maxDocumentsPerQuery" : undefined, cursorResults.cappedBy, ].filter((limit): limit is keyof typeof CURSOR_LIMITS_TO_LLM_TEXT => !!limit), }), ...(cursorResults.documents.length > 0 ? [EJSON.stringify(cursorResults.documents)] : []) ), }; } finally { if (aggregationCursor) { void this.safeCloseCursor(aggregationCursor); } } } private async safeCloseCursor(cursor: AggregationCursor<unknown>): Promise<void> { try { await cursor.close(); } catch (error) { this.session.logger.warning({ id: LogId.mongodbCursorCloseError, context: "aggregate tool", message: `Error when closing the cursor - ${error instanceof Error ? error.message : String(error)}`, }); } } private async assertOnlyUsesPermittedStages(pipeline: Record<string, unknown>[]): Promise<void> { const writeOperations: OperationType[] = ["update", "create", "delete"]; const isSearchSupported = await this.session.isSearchSupported(); let writeStageForbiddenError = ""; if (this.config.readOnly) { writeStageForbiddenError = "In readOnly mode you can not run pipelines with $out or $merge stages."; } else if (this.config.disabledTools.some((t) => writeOperations.includes(t as OperationType))) { writeStageForbiddenError = "When 'create', 'update', or 'delete' operations are disabled, you can not run pipelines with $out or $merge stages."; } for (const stage of pipeline) { // This validates that in readOnly mode or "write" operations are disabled, we can't use $out or $merge. // This is really important because aggregates are the only "multi-faceted" tool in the MQL, where you // can both read and write. if ((stage.$out || stage.$merge) && writeStageForbiddenError) { throw new MongoDBError(ErrorCodes.ForbiddenWriteOperation, writeStageForbiddenError); } // This ensure that you can't use $vectorSearch if the cluster does not support MongoDB Search // either in Atlas or in a local cluster. if (stage.$vectorSearch && !isSearchSupported) { throw new MongoDBError( ErrorCodes.AtlasSearchNotSupported, "Atlas Search is not supported in this cluster." ); } } } private async countAggregationResultDocuments({ provider, database, collection, pipeline, }: { provider: NodeDriverServiceProvider; database: string; collection: string; pipeline: Document[]; }): Promise<number | undefined> { const resultsCountAggregation = [...pipeline, { $count: "totalDocuments" }]; return await operationWithFallback(async (): Promise<number | undefined> => { const aggregationResults = await provider .aggregate(database, collection, resultsCountAggregation) .maxTimeMS(AGG_COUNT_MAX_TIME_MS_CAP) .toArray(); const documentWithCount: unknown = aggregationResults.length === 1 ? aggregationResults[0] : undefined; const totalDocuments = documentWithCount && typeof documentWithCount === "object" && "totalDocuments" in documentWithCount && typeof documentWithCount.totalDocuments === "number" ? documentWithCount.totalDocuments : 0; return totalDocuments; }, undefined); } private async replaceRawValuesWithEmbeddingsIfNecessary({ database, collection, pipeline, }: { database: string; collection: string; pipeline: Document[]; }): Promise<Document[]> { for (const stage of pipeline) { if ("$vectorSearch" in stage) { const { $vectorSearch: vectorSearchStage } = stage as z.infer<typeof VectorSearchStage>; if (Array.isArray(vectorSearchStage.queryVector)) { continue; } if (!vectorSearchStage.embeddingParameters) { throw new MongoDBError( ErrorCodes.AtlasVectorSearchInvalidQuery, "embeddingModel is mandatory if queryVector is a raw string." ); } const embeddingParameters = vectorSearchStage.embeddingParameters; delete vectorSearchStage.embeddingParameters; await this.session.vectorSearchEmbeddingsManager.assertVectorSearchIndexExists({ database, collection, path: vectorSearchStage.path, }); const [embeddings] = await this.session.vectorSearchEmbeddingsManager.generateEmbeddings({ rawValues: [vectorSearchStage.queryVector], embeddingParameters, inputType: "query", }); if (!embeddings) { throw new MongoDBError( ErrorCodes.AtlasVectorSearchInvalidQuery, "Failed to generate embeddings for the query vector." ); } // $vectorSearch.queryVector can be a BSON.Binary: that it's not either number or an array. // It's not exactly valid from the LLM perspective (they can't provide binaries). // That's why we overwrite the stage in an untyped way, as what we expose and what LLMs can use is different. vectorSearchStage.queryVector = embeddings as string | number[]; } } await this.session.vectorSearchEmbeddingsManager.assertFieldsHaveCorrectEmbeddings( { database, collection }, pipeline ); return pipeline; } private async isVectorSearchIndexUsed({ database, collection, pipeline, }: { database: string; collection: string; pipeline: Document[]; }): Promise<["valid-index" | "non-existent-index" | "not-vector-search-query", string?]> { // check if the pipeline contains a $vectorSearch stage let usesVectorSearch = false; let indexName: string = "default"; for (const stage of pipeline) { if ("$vectorSearch" in stage) { const { $vectorSearch: vectorSearchStage } = stage as z.infer<typeof VectorSearchStage>; usesVectorSearch = true; indexName = vectorSearchStage.index; break; } } if (!usesVectorSearch) { return ["not-vector-search-query"]; } const indexExists = await this.session.vectorSearchEmbeddingsManager.indexExists({ database, collection, indexName, }); return [indexExists ? "valid-index" : "non-existent-index", indexName]; } private generateMessage({ aggResultsCount, documents, appliedLimits, }: { aggResultsCount: number | undefined; documents: unknown[]; appliedLimits: (keyof typeof CURSOR_LIMITS_TO_LLM_TEXT)[]; }): string { const appliedLimitText = appliedLimits.length ? `\ while respecting the applied limits of ${appliedLimits.map((limit) => CURSOR_LIMITS_TO_LLM_TEXT[limit]).join(", ")}. \ Note to LLM: If the entire query result is required then use "export" tool to export the query results.\ ` : ""; return `\ The aggregation resulted in ${aggResultsCount === undefined ? "indeterminable number of" : aggResultsCount} documents. \ Returning ${documents.length} documents${appliedLimitText ? ` ${appliedLimitText}` : "."}\ `; } }
- getAggregateArgs function defines the Zod input schema for the aggregate tool's pipeline and response limits.export const getAggregateArgs = (vectorSearchEnabled: boolean) => ({ pipeline: z .array(vectorSearchEnabled ? z.union([AnyAggregateStage, VectorSearchStage]) : AnyAggregateStage) .describe(vectorSearchEnabled ? pipelineDescriptionWithVectorSearch : genericPipelineDescription), responseBytesLimit: z.number().optional().default(ONE_MB).describe(`\ The maximum number of bytes to return in the response. This value is capped by the server's configured maxBytesPerQuery and cannot be exceeded. \ Note to LLM: If the entire aggregation result is required, use the "export" tool instead of increasing this limit.\ `), }) as const;
- Zod schemas defining valid aggregate pipeline stages, including support for $vectorSearch, used by the aggregate tool.export const AnyAggregateStage = zEJSON(); export const VectorSearchStage = z.object({ $vectorSearch: z .object({ exact: z .boolean() .optional() .default(false) .describe( "When true, uses an ENN algorithm, otherwise uses ANN. Using ENN is not compatible with numCandidates, in that case, numCandidates must be left empty." ), index: z.string().describe("Name of the index, as retrieved from the `collection-indexes` tool."), path: z .string() .describe( "Field, in dot notation, where to search. There must be a vector search index for that field. Note to LLM: When unsure, use the 'collection-indexes' tool to validate that the field is indexed with a vector search index." ), queryVector: z .union([z.string(), z.array(z.number())]) .describe( "The content to search for. The embeddingParameters field is mandatory if the queryVector is a string, in that case, the tool generates the embedding automatically using the provided configuration." ), numCandidates: z .number() .int() .positive() .optional() .describe("Number of candidates for the ANN algorithm. Mandatory when exact is false."), limit: z.number().int().positive().optional().default(10), filter: zEJSON() .optional() .describe( "MQL filter that can only use filter fields from the index definition. Note to LLM: If unsure, use the `collection-indexes` tool to learn which fields can be used for filtering." ), embeddingParameters: zSupportedEmbeddingParameters .optional() .describe( "The embedding model and its parameters to use to generate embeddings before searching. It is mandatory if queryVector is a string value. Note to LLM: If unsure, ask the user before providing one." ), }) .passthrough(), });
- src/tools/index.ts:7-11 (registration)AllTools array registers all tools including AggregateTool (via MongoDbTools import from mongodb/tools.ts).export const AllTools: ToolClass[] = Object.values({ ...MongoDbTools, ...AtlasTools, ...AtlasLocalTools, });
- src/tools/mongodb/tools.ts:13-13 (registration)Export of AggregateTool making it available for inclusion in AllTools.export { AggregateTool } from "./read/aggregate.js";