aggregate
Process MongoDB data by applying multiple operations like filtering, grouping, and sorting in a pipeline to transform and analyze database collections.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| collectionName | Yes | ||
| pipeline | Yes | ||
| options | No |
Implementation Reference
- src/index.ts:210-237 (handler)Handler function that executes MongoDB aggregation: connects to DB if needed, runs collection.aggregate(pipeline, options), returns JSON stringified result.async ({ collectionName, pipeline, options }) => { try { let db = mongodbConnection.getDb(); if (!db) { await mongodbConnection.connect(this.MONGODB_URI); db = mongodbConnection.getDb(); if (!db) throw new Error("Failed to connect to database"); } const collection = db.collection(collectionName); const result = await collection .aggregate(pipeline, { maxTimeMS: 30000, ...options }) .toArray(); return { content: [ { type: "text", text: JSON.stringify(result), }, ], }; } catch (error) { console.error(error); return { content: [{ type: "text", text: "Error: " + error }], }; } } );
- src/index.ts:153-209 (schema)Zod input schema for aggregate tool: collectionName (string), pipeline (array of MongoDB aggregation stages with validation ensuring exactly one stage type per object), optional options.{ collectionName: z.string(), pipeline: z.array( z .object({ $match: z.record(z.any()).optional(), $group: z.record(z.any()).optional(), $project: z.record(z.any()).optional(), $sort: z.record(z.any()).optional(), $limit: z.number().optional(), $skip: z.number().optional(), $unwind: z.string().optional(), $lookup: z .object({ from: z.string(), localField: z.string(), foreignField: z.string(), as: z.string(), }) .optional(), $count: z.string().optional(), $addFields: z.record(z.any()).optional(), $replaceRoot: z.record(z.any()).optional(), $facet: z.record(z.any()).optional(), $bucket: z.record(z.any()).optional(), $geoNear: z.record(z.any()).optional(), $indexStats: z.record(z.any()).optional(), $listLocalSessions: z.record(z.any()).optional(), $listSessions: z.record(z.any()).optional(), $merge: z.record(z.any()).optional(), $out: z.string().optional(), $planCacheStats: z.record(z.any()).optional(), $redact: z.record(z.any()).optional(), $replaceWith: z.record(z.any()).optional(), $sample: z.object({ size: z.number() }).optional(), $search: z.record(z.any()).optional(), $searchMeta: z.record(z.any()).optional(), $set: z.record(z.any()).optional(), $setWindowFields: z.record(z.any()).optional(), $unionWith: z.record(z.any()).optional(), $unset: z.string().optional(), }) .refine( (obj) => { // Count the number of defined fields const definedFields = Object.keys(obj).filter( (key) => obj[key as keyof typeof obj] !== undefined ); return definedFields.length === 1; }, { message: "Each pipeline stage must contain exactly one field", } ) ), options: z.object({}).optional(), },
- src/index.ts:151-237 (registration)Full registration of the 'aggregate' tool using this.mcpServer.tool(name, schema, handler), including inline schema and handler.this.mcpServer.tool( "aggregate", { collectionName: z.string(), pipeline: z.array( z .object({ $match: z.record(z.any()).optional(), $group: z.record(z.any()).optional(), $project: z.record(z.any()).optional(), $sort: z.record(z.any()).optional(), $limit: z.number().optional(), $skip: z.number().optional(), $unwind: z.string().optional(), $lookup: z .object({ from: z.string(), localField: z.string(), foreignField: z.string(), as: z.string(), }) .optional(), $count: z.string().optional(), $addFields: z.record(z.any()).optional(), $replaceRoot: z.record(z.any()).optional(), $facet: z.record(z.any()).optional(), $bucket: z.record(z.any()).optional(), $geoNear: z.record(z.any()).optional(), $indexStats: z.record(z.any()).optional(), $listLocalSessions: z.record(z.any()).optional(), $listSessions: z.record(z.any()).optional(), $merge: z.record(z.any()).optional(), $out: z.string().optional(), $planCacheStats: z.record(z.any()).optional(), $redact: z.record(z.any()).optional(), $replaceWith: z.record(z.any()).optional(), $sample: z.object({ size: z.number() }).optional(), $search: z.record(z.any()).optional(), $searchMeta: z.record(z.any()).optional(), $set: z.record(z.any()).optional(), $setWindowFields: z.record(z.any()).optional(), $unionWith: z.record(z.any()).optional(), $unset: z.string().optional(), }) .refine( (obj) => { // Count the number of defined fields const definedFields = Object.keys(obj).filter( (key) => obj[key as keyof typeof obj] !== undefined ); return definedFields.length === 1; }, { message: "Each pipeline stage must contain exactly one field", } ) ), options: z.object({}).optional(), }, async ({ collectionName, pipeline, options }) => { try { let db = mongodbConnection.getDb(); if (!db) { await mongodbConnection.connect(this.MONGODB_URI); db = mongodbConnection.getDb(); if (!db) throw new Error("Failed to connect to database"); } const collection = db.collection(collectionName); const result = await collection .aggregate(pipeline, { maxTimeMS: 30000, ...options }) .toArray(); return { content: [ { type: "text", text: JSON.stringify(result), }, ], }; } catch (error) { console.error(error); return { content: [{ type: "text", text: "Error: " + error }], }; } } );