Skip to main content
Glama
manpreet2000

MCP Database Server

aggregate

Process MongoDB data through aggregation pipelines to filter, group, transform, and analyze documents in collections for reporting and insights.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
collectionNameYes
pipelineYes
optionsNo

Implementation Reference

  • src/index.ts:151-237 (registration)
    Registration of the 'aggregate' tool using mcpServer.tool, including schema and handler function
    this.mcpServer.tool( "aggregate", { collectionName: z.string(), pipeline: z.array( z .object({ $match: z.record(z.any()).optional(), $group: z.record(z.any()).optional(), $project: z.record(z.any()).optional(), $sort: z.record(z.any()).optional(), $limit: z.number().optional(), $skip: z.number().optional(), $unwind: z.string().optional(), $lookup: z .object({ from: z.string(), localField: z.string(), foreignField: z.string(), as: z.string(), }) .optional(), $count: z.string().optional(), $addFields: z.record(z.any()).optional(), $replaceRoot: z.record(z.any()).optional(), $facet: z.record(z.any()).optional(), $bucket: z.record(z.any()).optional(), $geoNear: z.record(z.any()).optional(), $indexStats: z.record(z.any()).optional(), $listLocalSessions: z.record(z.any()).optional(), $listSessions: z.record(z.any()).optional(), $merge: z.record(z.any()).optional(), $out: z.string().optional(), $planCacheStats: z.record(z.any()).optional(), $redact: z.record(z.any()).optional(), $replaceWith: z.record(z.any()).optional(), $sample: z.object({ size: z.number() }).optional(), $search: z.record(z.any()).optional(), $searchMeta: z.record(z.any()).optional(), $set: z.record(z.any()).optional(), $setWindowFields: z.record(z.any()).optional(), $unionWith: z.record(z.any()).optional(), $unset: z.string().optional(), }) .refine( (obj) => { // Count the number of defined fields const definedFields = Object.keys(obj).filter( (key) => obj[key as keyof typeof obj] !== undefined ); return definedFields.length === 1; }, { message: "Each pipeline stage must contain exactly one field", } ) ), options: z.object({}).optional(), }, async ({ collectionName, pipeline, options }) => { try { let db = mongodbConnection.getDb(); if (!db) { await mongodbConnection.connect(this.MONGODB_URI); db = mongodbConnection.getDb(); if (!db) throw new Error("Failed to connect to database"); } const collection = db.collection(collectionName); const result = await collection .aggregate(pipeline, { maxTimeMS: 30000, ...options }) .toArray(); return { content: [ { type: "text", text: JSON.stringify(result), }, ], }; } catch (error) { console.error(error); return { content: [{ type: "text", text: "Error: " + error }], }; } } );
  • Zod input schema for the 'aggregate' tool defining collectionName, pipeline array with various MongoDB aggregation stages (refined to exactly one stage per object), and optional options
    { collectionName: z.string(), pipeline: z.array( z .object({ $match: z.record(z.any()).optional(), $group: z.record(z.any()).optional(), $project: z.record(z.any()).optional(), $sort: z.record(z.any()).optional(), $limit: z.number().optional(), $skip: z.number().optional(), $unwind: z.string().optional(), $lookup: z .object({ from: z.string(), localField: z.string(), foreignField: z.string(), as: z.string(), }) .optional(), $count: z.string().optional(), $addFields: z.record(z.any()).optional(), $replaceRoot: z.record(z.any()).optional(), $facet: z.record(z.any()).optional(), $bucket: z.record(z.any()).optional(), $geoNear: z.record(z.any()).optional(), $indexStats: z.record(z.any()).optional(), $listLocalSessions: z.record(z.any()).optional(), $listSessions: z.record(z.any()).optional(), $merge: z.record(z.any()).optional(), $out: z.string().optional(), $planCacheStats: z.record(z.any()).optional(), $redact: z.record(z.any()).optional(), $replaceWith: z.record(z.any()).optional(), $sample: z.object({ size: z.number() }).optional(), $search: z.record(z.any()).optional(), $searchMeta: z.record(z.any()).optional(), $set: z.record(z.any()).optional(), $setWindowFields: z.record(z.any()).optional(), $unionWith: z.record(z.any()).optional(), $unset: z.string().optional(), }) .refine( (obj) => { // Count the number of defined fields const definedFields = Object.keys(obj).filter( (key) => obj[key as keyof typeof obj] !== undefined ); return definedFields.length === 1; }, { message: "Each pipeline stage must contain exactly one field", } ) ), options: z.object({}).optional(), },
  • Handler function for 'aggregate' tool: connects to MongoDB if needed, gets collection, runs aggregate pipeline with options and maxTimeMS 30s, returns JSON stringified results or error
    async ({ collectionName, pipeline, options }) => { try { let db = mongodbConnection.getDb(); if (!db) { await mongodbConnection.connect(this.MONGODB_URI); db = mongodbConnection.getDb(); if (!db) throw new Error("Failed to connect to database"); } const collection = db.collection(collectionName); const result = await collection .aggregate(pipeline, { maxTimeMS: 30000, ...options }) .toArray(); return { content: [ { type: "text", text: JSON.stringify(result), }, ], }; } catch (error) { console.error(error); return { content: [{ type: "text", text: "Error: " + error }], }; } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/manpreet2000/mcp-database-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server