Skip to main content
Glama
manpreet2000

MCP Database Server

aggregate

Process MongoDB data by applying multiple operations like filtering, grouping, and sorting in a pipeline to transform and analyze database collections.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
collectionNameYes
pipelineYes
optionsNo

Implementation Reference

  • Handler function that executes MongoDB aggregation: connects to DB if needed, runs collection.aggregate(pipeline, options), returns JSON stringified result.
    async ({ collectionName, pipeline, options }) => { try { let db = mongodbConnection.getDb(); if (!db) { await mongodbConnection.connect(this.MONGODB_URI); db = mongodbConnection.getDb(); if (!db) throw new Error("Failed to connect to database"); } const collection = db.collection(collectionName); const result = await collection .aggregate(pipeline, { maxTimeMS: 30000, ...options }) .toArray(); return { content: [ { type: "text", text: JSON.stringify(result), }, ], }; } catch (error) { console.error(error); return { content: [{ type: "text", text: "Error: " + error }], }; } } );
  • Zod input schema for aggregate tool: collectionName (string), pipeline (array of MongoDB aggregation stages with validation ensuring exactly one stage type per object), optional options.
    { collectionName: z.string(), pipeline: z.array( z .object({ $match: z.record(z.any()).optional(), $group: z.record(z.any()).optional(), $project: z.record(z.any()).optional(), $sort: z.record(z.any()).optional(), $limit: z.number().optional(), $skip: z.number().optional(), $unwind: z.string().optional(), $lookup: z .object({ from: z.string(), localField: z.string(), foreignField: z.string(), as: z.string(), }) .optional(), $count: z.string().optional(), $addFields: z.record(z.any()).optional(), $replaceRoot: z.record(z.any()).optional(), $facet: z.record(z.any()).optional(), $bucket: z.record(z.any()).optional(), $geoNear: z.record(z.any()).optional(), $indexStats: z.record(z.any()).optional(), $listLocalSessions: z.record(z.any()).optional(), $listSessions: z.record(z.any()).optional(), $merge: z.record(z.any()).optional(), $out: z.string().optional(), $planCacheStats: z.record(z.any()).optional(), $redact: z.record(z.any()).optional(), $replaceWith: z.record(z.any()).optional(), $sample: z.object({ size: z.number() }).optional(), $search: z.record(z.any()).optional(), $searchMeta: z.record(z.any()).optional(), $set: z.record(z.any()).optional(), $setWindowFields: z.record(z.any()).optional(), $unionWith: z.record(z.any()).optional(), $unset: z.string().optional(), }) .refine( (obj) => { // Count the number of defined fields const definedFields = Object.keys(obj).filter( (key) => obj[key as keyof typeof obj] !== undefined ); return definedFields.length === 1; }, { message: "Each pipeline stage must contain exactly one field", } ) ), options: z.object({}).optional(), },
  • src/index.ts:151-237 (registration)
    Full registration of the 'aggregate' tool using this.mcpServer.tool(name, schema, handler), including inline schema and handler.
    this.mcpServer.tool( "aggregate", { collectionName: z.string(), pipeline: z.array( z .object({ $match: z.record(z.any()).optional(), $group: z.record(z.any()).optional(), $project: z.record(z.any()).optional(), $sort: z.record(z.any()).optional(), $limit: z.number().optional(), $skip: z.number().optional(), $unwind: z.string().optional(), $lookup: z .object({ from: z.string(), localField: z.string(), foreignField: z.string(), as: z.string(), }) .optional(), $count: z.string().optional(), $addFields: z.record(z.any()).optional(), $replaceRoot: z.record(z.any()).optional(), $facet: z.record(z.any()).optional(), $bucket: z.record(z.any()).optional(), $geoNear: z.record(z.any()).optional(), $indexStats: z.record(z.any()).optional(), $listLocalSessions: z.record(z.any()).optional(), $listSessions: z.record(z.any()).optional(), $merge: z.record(z.any()).optional(), $out: z.string().optional(), $planCacheStats: z.record(z.any()).optional(), $redact: z.record(z.any()).optional(), $replaceWith: z.record(z.any()).optional(), $sample: z.object({ size: z.number() }).optional(), $search: z.record(z.any()).optional(), $searchMeta: z.record(z.any()).optional(), $set: z.record(z.any()).optional(), $setWindowFields: z.record(z.any()).optional(), $unionWith: z.record(z.any()).optional(), $unset: z.string().optional(), }) .refine( (obj) => { // Count the number of defined fields const definedFields = Object.keys(obj).filter( (key) => obj[key as keyof typeof obj] !== undefined ); return definedFields.length === 1; }, { message: "Each pipeline stage must contain exactly one field", } ) ), options: z.object({}).optional(), }, async ({ collectionName, pipeline, options }) => { try { let db = mongodbConnection.getDb(); if (!db) { await mongodbConnection.connect(this.MONGODB_URI); db = mongodbConnection.getDb(); if (!db) throw new Error("Failed to connect to database"); } const collection = db.collection(collectionName); const result = await collection .aggregate(pipeline, { maxTimeMS: 30000, ...options }) .toArray(); return { content: [ { type: "text", text: JSON.stringify(result), }, ], }; } catch (error) { console.error(error); return { content: [{ type: "text", text: "Error: " + error }], }; } } );

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/manpreet2000/mcp-database-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server