Skip to main content
Glama
mongodb-js

MongoDB MCP Server

Official
by mongodb-js

aggregate

Read-only

Execute aggregation pipelines on MongoDB collections to transform, filter, and analyze data through multi-stage processing.

Instructions

Run an aggregation against a MongoDB collection

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
databaseYesDatabase name
collectionYesCollection name
pipelineYesAn array of aggregation stages to execute.
responseBytesLimitNoThe maximum number of bytes to return in the response. This value is capped by the server's configured maxBytesPerQuery and cannot be exceeded. Note to LLM: If the entire aggregation result is required, use the "export" tool instead of increasing this limit.

Implementation Reference

  • The AggregateTool class provides the core handler logic for the 'aggregate' tool, including validation, index checks, embedding generation if needed, execution of the aggregation pipeline with limits, and formatting of results.
    export class AggregateTool extends MongoDBToolBase {
        public name = "aggregate";
        protected description = "Run an aggregation against a MongoDB collection";
        protected argsShape = {
            ...DbOperationArgs,
            ...getAggregateArgs(this.isFeatureEnabled("search")),
        };
        static operationType: OperationType = "read";
    
        protected async execute(
            { database, collection, pipeline, responseBytesLimit }: ToolArgs<typeof this.argsShape>,
            { signal }: ToolExecutionContext
        ): Promise<CallToolResult> {
            let aggregationCursor: AggregationCursor | undefined = undefined;
            try {
                const provider = await this.ensureConnected();
                await this.assertOnlyUsesPermittedStages(pipeline);
                if (await this.session.isSearchSupported()) {
                    assertVectorSearchFilterFieldsAreIndexed({
                        searchIndexes: (await provider.getSearchIndexes(database, collection)) as SearchIndex[],
                        pipeline,
                        logger: this.session.logger,
                    });
                }
    
                // Check if aggregate operation uses an index if enabled
                if (this.config.indexCheck) {
                    const [usesVectorSearchIndex, indexName] = await this.isVectorSearchIndexUsed({
                        database,
                        collection,
                        pipeline,
                    });
                    switch (usesVectorSearchIndex) {
                        case "not-vector-search-query":
                            await checkIndexUsage(provider, database, collection, "aggregate", async () => {
                                return provider
                                    .aggregate(database, collection, pipeline, {}, { writeConcern: undefined })
                                    .explain("queryPlanner");
                            });
                            break;
                        case "non-existent-index":
                            throw new MongoDBError(
                                ErrorCodes.AtlasVectorSearchIndexNotFound,
                                `Could not find an index with name "${indexName}" in namespace "${database}.${collection}".`
                            );
                        case "valid-index":
                        // nothing to do, everything is correct so ready to run the query
                    }
                }
    
                pipeline = await this.replaceRawValuesWithEmbeddingsIfNecessary({
                    database,
                    collection,
                    pipeline,
                });
    
                const cappedResultsPipeline = [...pipeline];
                if (this.config.maxDocumentsPerQuery > 0) {
                    cappedResultsPipeline.push({ $limit: this.config.maxDocumentsPerQuery });
                }
                aggregationCursor = provider.aggregate(database, collection, cappedResultsPipeline);
    
                const [totalDocuments, cursorResults] = await Promise.all([
                    this.countAggregationResultDocuments({ provider, database, collection, pipeline }),
                    collectCursorUntilMaxBytesLimit({
                        cursor: aggregationCursor,
                        configuredMaxBytesPerQuery: this.config.maxBytesPerQuery,
                        toolResponseBytesLimit: responseBytesLimit,
                        abortSignal: signal,
                    }),
                ]);
    
                // If the total number of documents that the aggregation would've
                // resulted in would be greater than the configured
                // maxDocumentsPerQuery then we know for sure that the results were
                // capped.
                const aggregationResultsCappedByMaxDocumentsLimit =
                    this.config.maxDocumentsPerQuery > 0 &&
                    !!totalDocuments &&
                    totalDocuments > this.config.maxDocumentsPerQuery;
    
                return {
                    content: formatUntrustedData(
                        this.generateMessage({
                            aggResultsCount: totalDocuments,
                            documents: cursorResults.documents,
                            appliedLimits: [
                                aggregationResultsCappedByMaxDocumentsLimit ? "config.maxDocumentsPerQuery" : undefined,
                                cursorResults.cappedBy,
                            ].filter((limit): limit is keyof typeof CURSOR_LIMITS_TO_LLM_TEXT => !!limit),
                        }),
                        ...(cursorResults.documents.length > 0 ? [EJSON.stringify(cursorResults.documents)] : [])
                    ),
                };
            } finally {
                if (aggregationCursor) {
                    void this.safeCloseCursor(aggregationCursor);
                }
            }
        }
    
        private async safeCloseCursor(cursor: AggregationCursor<unknown>): Promise<void> {
            try {
                await cursor.close();
            } catch (error) {
                this.session.logger.warning({
                    id: LogId.mongodbCursorCloseError,
                    context: "aggregate tool",
                    message: `Error when closing the cursor - ${error instanceof Error ? error.message : String(error)}`,
                });
            }
        }
    
        private async assertOnlyUsesPermittedStages(pipeline: Record<string, unknown>[]): Promise<void> {
            const writeOperations: OperationType[] = ["update", "create", "delete"];
            const isSearchSupported = await this.session.isSearchSupported();
    
            let writeStageForbiddenError = "";
    
            if (this.config.readOnly) {
                writeStageForbiddenError = "In readOnly mode you can not run pipelines with $out or $merge stages.";
            } else if (this.config.disabledTools.some((t) => writeOperations.includes(t as OperationType))) {
                writeStageForbiddenError =
                    "When 'create', 'update', or 'delete' operations are disabled, you can not run pipelines with $out or $merge stages.";
            }
    
            for (const stage of pipeline) {
                // This validates that in readOnly mode or "write" operations are disabled, we can't use $out or $merge.
                // This is really important because aggregates are the only "multi-faceted" tool in the MQL, where you
                // can both read and write.
                if ((stage.$out || stage.$merge) && writeStageForbiddenError) {
                    throw new MongoDBError(ErrorCodes.ForbiddenWriteOperation, writeStageForbiddenError);
                }
    
                // This ensure that you can't use $vectorSearch if the cluster does not support MongoDB Search
                // either in Atlas or in a local cluster.
                if (stage.$vectorSearch && !isSearchSupported) {
                    throw new MongoDBError(
                        ErrorCodes.AtlasSearchNotSupported,
                        "Atlas Search is not supported in this cluster."
                    );
                }
            }
        }
    
        private async countAggregationResultDocuments({
            provider,
            database,
            collection,
            pipeline,
        }: {
            provider: NodeDriverServiceProvider;
            database: string;
            collection: string;
            pipeline: Document[];
        }): Promise<number | undefined> {
            const resultsCountAggregation = [...pipeline, { $count: "totalDocuments" }];
            return await operationWithFallback(async (): Promise<number | undefined> => {
                const aggregationResults = await provider
                    .aggregate(database, collection, resultsCountAggregation)
                    .maxTimeMS(AGG_COUNT_MAX_TIME_MS_CAP)
                    .toArray();
    
                const documentWithCount: unknown = aggregationResults.length === 1 ? aggregationResults[0] : undefined;
                const totalDocuments =
                    documentWithCount &&
                    typeof documentWithCount === "object" &&
                    "totalDocuments" in documentWithCount &&
                    typeof documentWithCount.totalDocuments === "number"
                        ? documentWithCount.totalDocuments
                        : 0;
    
                return totalDocuments;
            }, undefined);
        }
    
        private async replaceRawValuesWithEmbeddingsIfNecessary({
            database,
            collection,
            pipeline,
        }: {
            database: string;
            collection: string;
            pipeline: Document[];
        }): Promise<Document[]> {
            for (const stage of pipeline) {
                if ("$vectorSearch" in stage) {
                    const { $vectorSearch: vectorSearchStage } = stage as z.infer<typeof VectorSearchStage>;
    
                    if (Array.isArray(vectorSearchStage.queryVector)) {
                        continue;
                    }
    
                    if (!vectorSearchStage.embeddingParameters) {
                        throw new MongoDBError(
                            ErrorCodes.AtlasVectorSearchInvalidQuery,
                            "embeddingModel is mandatory if queryVector is a raw string."
                        );
                    }
    
                    const embeddingParameters = vectorSearchStage.embeddingParameters;
                    delete vectorSearchStage.embeddingParameters;
    
                    await this.session.vectorSearchEmbeddingsManager.assertVectorSearchIndexExists({
                        database,
                        collection,
                        path: vectorSearchStage.path,
                    });
    
                    const [embeddings] = await this.session.vectorSearchEmbeddingsManager.generateEmbeddings({
                        rawValues: [vectorSearchStage.queryVector],
                        embeddingParameters,
                        inputType: "query",
                    });
    
                    if (!embeddings) {
                        throw new MongoDBError(
                            ErrorCodes.AtlasVectorSearchInvalidQuery,
                            "Failed to generate embeddings for the query vector."
                        );
                    }
    
                    // $vectorSearch.queryVector can be a BSON.Binary: that it's not either number or an array.
                    // It's not exactly valid from the LLM perspective (they can't provide binaries).
                    // That's why we overwrite the stage in an untyped way, as what we expose and what LLMs can use is different.
                    vectorSearchStage.queryVector = embeddings as string | number[];
                }
            }
    
            await this.session.vectorSearchEmbeddingsManager.assertFieldsHaveCorrectEmbeddings(
                { database, collection },
                pipeline
            );
    
            return pipeline;
        }
    
        private async isVectorSearchIndexUsed({
            database,
            collection,
            pipeline,
        }: {
            database: string;
            collection: string;
            pipeline: Document[];
        }): Promise<["valid-index" | "non-existent-index" | "not-vector-search-query", string?]> {
            // check if the pipeline contains a $vectorSearch stage
            let usesVectorSearch = false;
            let indexName: string = "default";
    
            for (const stage of pipeline) {
                if ("$vectorSearch" in stage) {
                    const { $vectorSearch: vectorSearchStage } = stage as z.infer<typeof VectorSearchStage>;
                    usesVectorSearch = true;
                    indexName = vectorSearchStage.index;
                    break;
                }
            }
    
            if (!usesVectorSearch) {
                return ["not-vector-search-query"];
            }
    
            const indexExists = await this.session.vectorSearchEmbeddingsManager.indexExists({
                database,
                collection,
                indexName,
            });
    
            return [indexExists ? "valid-index" : "non-existent-index", indexName];
        }
    
        private generateMessage({
            aggResultsCount,
            documents,
            appliedLimits,
        }: {
            aggResultsCount: number | undefined;
            documents: unknown[];
            appliedLimits: (keyof typeof CURSOR_LIMITS_TO_LLM_TEXT)[];
        }): string {
            const appliedLimitText = appliedLimits.length
                ? `\
    while respecting the applied limits of ${appliedLimits.map((limit) => CURSOR_LIMITS_TO_LLM_TEXT[limit]).join(", ")}. \
    Note to LLM: If the entire query result is required then use "export" tool to export the query results.\
    `
                : "";
    
            return `\
    The aggregation resulted in ${aggResultsCount === undefined ? "indeterminable number of" : aggResultsCount} documents. \
    Returning ${documents.length} documents${appliedLimitText ? ` ${appliedLimitText}` : "."}\
    `;
        }
    
        protected resolveTelemetryMetadata(
            args: ToolArgs<typeof this.argsShape>,
            { result }: { result: CallToolResult }
        ): ConnectionMetadata | AutoEmbeddingsUsageMetadata {
            const [maybeVectorStage] = args.pipeline;
            if (
                maybeVectorStage !== null &&
                maybeVectorStage instanceof Object &&
                "$vectorSearch" in maybeVectorStage &&
                "embeddingParameters" in maybeVectorStage["$vectorSearch"] &&
                this.config.voyageApiKey
            ) {
                return {
                    ...super.resolveTelemetryMetadata(args, { result }),
                    embeddingsGeneratedBy: "mcp",
                };
            } else {
                return super.resolveTelemetryMetadata(args, { result });
            }
        }
    }
  • Defines the input schema (argsShape) for the aggregate tool, including the pipeline array with support for vectorSearch stages and optional responseBytesLimit.
    export const getAggregateArgs = (vectorSearchEnabled: boolean) =>
        ({
            pipeline: z
                .array(vectorSearchEnabled ? z.union([VectorSearchStage, AnyAggregateStage]) : AnyAggregateStage)
                .describe(vectorSearchEnabled ? pipelineDescriptionWithVectorSearch : genericPipelineDescription),
            responseBytesLimit: z.number().optional().default(ONE_MB).describe(`\
    The maximum number of bytes to return in the response. This value is capped by the server's configured maxBytesPerQuery and cannot be exceeded. \
    Note to LLM: If the entire aggregation result is required, use the "export" tool instead of increasing this limit.\
    `),
        }) as const;
  • Zod schemas for aggregation stages, including AnyAggregateStage and VectorSearchStage used in the aggregate tool's pipeline validation.
    export const AnyAggregateStage = zEJSON();
    export const VectorSearchStage = z.object({
        $vectorSearch: z
            .object({
                exact: z
                    .boolean()
                    .optional()
                    .default(false)
                    .describe(
                        "When true, uses an ENN algorithm, otherwise uses ANN. Using ENN is not compatible with numCandidates, in that case, numCandidates must be left empty."
                    ),
                index: z.string().describe("Name of the index, as retrieved from the `collection-indexes` tool."),
                path: z
                    .string()
                    .describe(
                        "Field, in dot notation, where to search. There must be a vector search index for that field. Note to LLM: When unsure, use the 'collection-indexes' tool to validate that the field is indexed with a vector search index."
                    ),
                queryVector: z
                    .union([z.string(), z.array(z.number())])
                    .describe(
                        "The content to search for. The embeddingParameters field is mandatory if the queryVector is a string, in that case, the tool generates the embedding automatically using the provided configuration."
                    ),
                numCandidates: z
                    .number()
                    .int()
                    .positive()
                    .optional()
                    .describe("Number of candidates for the ANN algorithm. Mandatory when exact is false."),
                limit: z.number().int().positive().optional().default(10),
                filter: zEJSON()
                    .optional()
                    .describe(
                        "MQL filter that can only use filter fields from the index definition. Note to LLM: If unsure, use the `collection-indexes` tool to learn which fields can be used for filtering."
                    ),
                embeddingParameters: zSupportedEmbeddingParameters
                    .optional()
                    .describe(
                        "The embedding model and its parameters to use to generate embeddings before searching. It is mandatory if queryVector is a string value. Note to LLM: If unsure, ask the user before providing one."
                    ),
            })
            .passthrough(),
    });
  • Re-exports the AggregateTool for inclusion in the MongoDB tools module.
    export { AggregateTool } from "./read/aggregate.js";
  • Defines AllTools array which includes all MongoDB tools (via MongoDbTools), used by the server to register tools including AggregateTool.
    export const AllTools: ToolClass[] = Object.values({
        ...MongoDbTools,
        ...AtlasTools,
        ...AtlasLocalTools,
    });
Behavior4/5

Does the description disclose side effects, auth requirements, rate limits, or destructive behavior?

The description adds valuable behavioral context beyond the annotations: it mentions the responseBytesLimit constraint and server-side capping, plus the alternative 'export' tool for larger results. While annotations already declare readOnlyHint=true and destructiveHint=false, the description provides practical implementation details about result size limitations that aren't captured in structured fields.

Agents need to know what a tool does to the world before calling it. Descriptions should go beyond structured annotations to explain consequences.

Conciseness5/5

Is the description appropriately sized, front-loaded, and free of redundancy?

The description is extremely concise - a single sentence that directly states the tool's purpose without any unnecessary elaboration. Every word earns its place, and the structure is front-loaded with the core functionality. This is model efficiency in description writing.

Shorter descriptions cost fewer tokens and are easier for agents to parse. Every sentence should earn its place.

Completeness3/5

Given the tool's complexity, does the description cover enough for an agent to succeed on first attempt?

For a read-only aggregation tool with good annotations and full schema coverage, the description is minimally complete. It covers the basic purpose and hints at result size limitations, but lacks information about return format, error conditions, or performance characteristics. Without an output schema, more guidance about what the aggregation returns would be helpful for agent understanding.

Complex tools with many parameters or behaviors need more documentation. Simple tools need less. This dimension scales expectations accordingly.

Parameters3/5

Does the description clarify parameter syntax, constraints, interactions, or defaults beyond what the schema provides?

With 100% schema description coverage, the input schema already documents all parameters thoroughly. The description doesn't add meaningful parameter semantics beyond what's in the schema - it mentions aggregation generally but provides no additional context about pipeline construction, database/collection selection, or response limit implications beyond what the schema descriptions already state.

Input schemas describe structure but not intent. Descriptions should explain non-obvious parameter relationships and valid value ranges.

Purpose4/5

Does the description clearly state what the tool does and how it differs from similar tools?

The description clearly states the action ('Run an aggregation') and target ('against a MongoDB collection'), providing a specific verb+resource combination. However, it doesn't explicitly differentiate this tool from sibling tools like 'find' or 'count' which also query MongoDB collections, missing an opportunity for clearer distinction.

Agents choose between tools based on descriptions. A clear purpose with a specific verb and resource helps agents select the right tool.

Usage Guidelines3/5

Does the description explain when to use this tool, when not to, or what alternatives exist?

The description provides some implied usage context through the responseBytesLimit parameter note about using the 'export' tool for larger results, but lacks explicit guidance on when to choose this tool over alternatives like 'find' or 'count'. No clear when/when-not instructions or sibling tool comparisons are provided.

Agents often have multiple tools that could apply. Explicit usage guidance like "use X instead of Y when Z" prevents misuse.

Install Server

Other Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/mongodb-js/mongodb-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server