aggregate
Process MongoDB data through aggregation pipelines to filter, group, transform, and analyze documents in collections for reporting and insights.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| collectionName | Yes | ||
| pipeline | Yes | ||
| options | No |
Implementation Reference
- src/index.ts:151-237 (registration)Registration of the 'aggregate' tool using mcpServer.tool, including schema and handler functionthis.mcpServer.tool( "aggregate", { collectionName: z.string(), pipeline: z.array( z .object({ $match: z.record(z.any()).optional(), $group: z.record(z.any()).optional(), $project: z.record(z.any()).optional(), $sort: z.record(z.any()).optional(), $limit: z.number().optional(), $skip: z.number().optional(), $unwind: z.string().optional(), $lookup: z .object({ from: z.string(), localField: z.string(), foreignField: z.string(), as: z.string(), }) .optional(), $count: z.string().optional(), $addFields: z.record(z.any()).optional(), $replaceRoot: z.record(z.any()).optional(), $facet: z.record(z.any()).optional(), $bucket: z.record(z.any()).optional(), $geoNear: z.record(z.any()).optional(), $indexStats: z.record(z.any()).optional(), $listLocalSessions: z.record(z.any()).optional(), $listSessions: z.record(z.any()).optional(), $merge: z.record(z.any()).optional(), $out: z.string().optional(), $planCacheStats: z.record(z.any()).optional(), $redact: z.record(z.any()).optional(), $replaceWith: z.record(z.any()).optional(), $sample: z.object({ size: z.number() }).optional(), $search: z.record(z.any()).optional(), $searchMeta: z.record(z.any()).optional(), $set: z.record(z.any()).optional(), $setWindowFields: z.record(z.any()).optional(), $unionWith: z.record(z.any()).optional(), $unset: z.string().optional(), }) .refine( (obj) => { // Count the number of defined fields const definedFields = Object.keys(obj).filter( (key) => obj[key as keyof typeof obj] !== undefined ); return definedFields.length === 1; }, { message: "Each pipeline stage must contain exactly one field", } ) ), options: z.object({}).optional(), }, async ({ collectionName, pipeline, options }) => { try { let db = mongodbConnection.getDb(); if (!db) { await mongodbConnection.connect(this.MONGODB_URI); db = mongodbConnection.getDb(); if (!db) throw new Error("Failed to connect to database"); } const collection = db.collection(collectionName); const result = await collection .aggregate(pipeline, { maxTimeMS: 30000, ...options }) .toArray(); return { content: [ { type: "text", text: JSON.stringify(result), }, ], }; } catch (error) { console.error(error); return { content: [{ type: "text", text: "Error: " + error }], }; } } );
- src/index.ts:153-209 (schema)Zod input schema for the 'aggregate' tool defining collectionName, pipeline array with various MongoDB aggregation stages (refined to exactly one stage per object), and optional options{ collectionName: z.string(), pipeline: z.array( z .object({ $match: z.record(z.any()).optional(), $group: z.record(z.any()).optional(), $project: z.record(z.any()).optional(), $sort: z.record(z.any()).optional(), $limit: z.number().optional(), $skip: z.number().optional(), $unwind: z.string().optional(), $lookup: z .object({ from: z.string(), localField: z.string(), foreignField: z.string(), as: z.string(), }) .optional(), $count: z.string().optional(), $addFields: z.record(z.any()).optional(), $replaceRoot: z.record(z.any()).optional(), $facet: z.record(z.any()).optional(), $bucket: z.record(z.any()).optional(), $geoNear: z.record(z.any()).optional(), $indexStats: z.record(z.any()).optional(), $listLocalSessions: z.record(z.any()).optional(), $listSessions: z.record(z.any()).optional(), $merge: z.record(z.any()).optional(), $out: z.string().optional(), $planCacheStats: z.record(z.any()).optional(), $redact: z.record(z.any()).optional(), $replaceWith: z.record(z.any()).optional(), $sample: z.object({ size: z.number() }).optional(), $search: z.record(z.any()).optional(), $searchMeta: z.record(z.any()).optional(), $set: z.record(z.any()).optional(), $setWindowFields: z.record(z.any()).optional(), $unionWith: z.record(z.any()).optional(), $unset: z.string().optional(), }) .refine( (obj) => { // Count the number of defined fields const definedFields = Object.keys(obj).filter( (key) => obj[key as keyof typeof obj] !== undefined ); return definedFields.length === 1; }, { message: "Each pipeline stage must contain exactly one field", } ) ), options: z.object({}).optional(), },
- src/index.ts:210-236 (handler)Handler function for 'aggregate' tool: connects to MongoDB if needed, gets collection, runs aggregate pipeline with options and maxTimeMS 30s, returns JSON stringified results or errorasync ({ collectionName, pipeline, options }) => { try { let db = mongodbConnection.getDb(); if (!db) { await mongodbConnection.connect(this.MONGODB_URI); db = mongodbConnection.getDb(); if (!db) throw new Error("Failed to connect to database"); } const collection = db.collection(collectionName); const result = await collection .aggregate(pipeline, { maxTimeMS: 30000, ...options }) .toArray(); return { content: [ { type: "text", text: JSON.stringify(result), }, ], }; } catch (error) { console.error(error); return { content: [{ type: "text", text: "Error: " + error }], }; } }