aggregate
Perform read-only aggregation on MongoDB collections to filter, group, sort, join, and reshape data using a safe pipeline, ensuring data integrity and efficiency.
Instructions
Execute a read-only aggregation pipeline on a collection.
Supported Stages:
$match: Filter documents
$group: Group documents by a key
$sort: Sort documents
$project: Shape the output
$lookup: Perform left outer joins
$unwind: Deconstruct array fields
Unsafe/Blocked Stages:
$out: Write results to collection
$merge: Merge results into collection
$addFields: Add new fields
$set: Set field values
$unset: Remove fields
$replaceRoot: Replace document structure
$replaceWith: Replace document
Example - User Statistics by Role: use_mcp_tool with server_name: "mongodb", tool_name: "aggregate", arguments: { "collection": "users", "pipeline": [ { "$match": { "active": true } }, { "$group": { "_id": "$role", "count": { "$sum": 1 }, "avgAge": { "$avg": "$age" } }}, { "$sort": { "count": -1 } } ], "limit": 100 }
Example - Posts with Author Details: use_mcp_tool with server_name: "mongodb", tool_name: "aggregate", arguments: { "collection": "posts", "pipeline": [ { "$match": { "published": true } }, { "$lookup": { "from": "users", "localField": "authorId", "foreignField": "_id", "as": "author" }}, { "$unwind": "$author" }, { "$project": { "title": 1, "authorName": "$author.name", "publishDate": 1 }} ] }
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| collection | Yes | Collection name | |
| database | No | Database name (optional if default database is configured) | |
| limit | No | Maximum number of documents to return (optional) | |
| pipeline | Yes | MongoDB aggregation pipeline stages (read-only operations only) |
Implementation Reference
- src/index.ts:1154-1189 (handler)Executes MongoDB aggregation pipeline after safety validation. Applies optional limit, returns JSON results with visualization suggestions.case 'aggregate': { const { database, collection, pipeline, limit } = request.params.arguments as { database?: string; collection: string; pipeline: any[]; limit?: number; }; const dbName = database || this.defaultDatabase; if (!dbName) { throw new McpError( ErrorCode.InvalidRequest, 'Database name is required when no default database is configured' ); } this.validateAggregationPipeline(pipeline); const db = client.db(dbName); let aggregation = db.collection(collection).aggregate(pipeline); if (limit) { aggregation = aggregation.limit(limit); } const results = await aggregation.toArray(); const vizHint = this.generateVisualizationHint(results); return { content: [ { type: 'text', text: JSON.stringify(results, null, 2) + (vizHint ? `\n\nVisualization Hint:\n${vizHint}` : ''), }, ], }; }
- src/index.ts:460-549 (registration)Registers the 'aggregate' tool in the list of available tools, including name, description, and input schema.{ name: 'aggregate', description: `Execute a read-only aggregation pipeline on a collection. Supported Stages: - $match: Filter documents - $group: Group documents by a key - $sort: Sort documents - $project: Shape the output - $addFields - Include additional or calculated fields - $lookup: Perform left outer joins - $unwind: Deconstruct array fields Unsafe/Blocked Stages: - $out: Write results to collection - $merge: Merge results into collection - $set: Set field values - $unset: Remove fields - $replaceRoot: Replace document structure - $replaceWith: Replace document Example - User Statistics by Role: use_mcp_tool with server_name: "mongodb", tool_name: "aggregate", arguments: { "collection": "users", "pipeline": [ { "$match": { "active": true } }, { "$group": { "_id": "$role", "count": { "$sum": 1 }, "avgAge": { "$avg": "$age" } }}, { "$sort": { "count": -1 } } ], "limit": 100 } Example - Posts with Author Details: use_mcp_tool with server_name: "mongodb", tool_name: "aggregate", arguments: { "collection": "posts", "pipeline": [ { "$match": { "published": true } }, { "$lookup": { "from": "users", "localField": "authorId", "foreignField": "_id", "as": "author" }}, { "$unwind": "$author" }, { "$project": { "title": 1, "authorName": "$author.name", "publishDate": 1 }} ] }`, inputSchema: { type: 'object', properties: { database: { type: 'string', description: 'Database name (optional if default database is configured)', }, collection: { type: 'string', description: 'Collection name', }, pipeline: { type: 'array', description: 'MongoDB aggregation pipeline stages (read-only operations only)', items: { type: 'object', }, }, limit: { type: 'number', description: 'Maximum number of documents to return (optional)', minimum: 1, maximum: 1000, }, }, required: ['collection', 'pipeline'], }, },
- src/index.ts:522-546 (schema)Defines the input schema for the 'aggregate' tool, specifying parameters like collection, pipeline, limit.inputSchema: { type: 'object', properties: { database: { type: 'string', description: 'Database name (optional if default database is configured)', }, collection: { type: 'string', description: 'Collection name', }, pipeline: { type: 'array', description: 'MongoDB aggregation pipeline stages (read-only operations only)', items: { type: 'object', }, }, limit: { type: 'number', description: 'Maximum number of documents to return (optional)', minimum: 1, maximum: 1000, }, },
- src/index.ts:161-187 (helper)Validates the aggregation pipeline to prevent unsafe (write) operations, ensuring read-only access.private validateAggregationPipeline(pipeline: any[]): void { if (!Array.isArray(pipeline)) { throw new McpError( ErrorCode.InvalidRequest, 'Aggregation pipeline must be an array' ); } if (pipeline.length === 0) { throw new McpError( ErrorCode.InvalidRequest, 'Aggregation pipeline cannot be empty' ); } const unsafeStages = ['$out', '$merge', '$set', '$unset', '$replaceRoot', '$replaceWith']; const unsafeStageFound = pipeline.find(stage => Object.keys(stage).some(key => unsafeStages.includes(key)) ); if (unsafeStageFound) { throw new McpError( ErrorCode.InvalidRequest, 'Pipeline contains unsafe stages that could modify data. Only read-only operations are allowed.' ); } }