Skip to main content
Glama
manpreet2000

MCP Database Server

aggregate

Process MongoDB data by applying multiple operations like filtering, grouping, and sorting in a pipeline to transform and analyze database collections.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
collectionNameYes
pipelineYes
optionsNo

Implementation Reference

  • Handler function that executes MongoDB aggregation: connects to DB if needed, runs collection.aggregate(pipeline, options), returns JSON stringified result.
      async ({ collectionName, pipeline, options }) => {
        try {
          let db = mongodbConnection.getDb();
          if (!db) {
            await mongodbConnection.connect(this.MONGODB_URI);
            db = mongodbConnection.getDb();
            if (!db) throw new Error("Failed to connect to database");
          }
          const collection = db.collection(collectionName);
          const result = await collection
            .aggregate(pipeline, { maxTimeMS: 30000, ...options })
            .toArray();
          return {
            content: [
              {
                type: "text",
                text: JSON.stringify(result),
              },
            ],
          };
        } catch (error) {
          console.error(error);
          return {
            content: [{ type: "text", text: "Error: " + error }],
          };
        }
      }
    );
  • Zod input schema for aggregate tool: collectionName (string), pipeline (array of MongoDB aggregation stages with validation ensuring exactly one stage type per object), optional options.
    {
      collectionName: z.string(),
      pipeline: z.array(
        z
          .object({
            $match: z.record(z.any()).optional(),
            $group: z.record(z.any()).optional(),
            $project: z.record(z.any()).optional(),
            $sort: z.record(z.any()).optional(),
            $limit: z.number().optional(),
            $skip: z.number().optional(),
            $unwind: z.string().optional(),
            $lookup: z
              .object({
                from: z.string(),
                localField: z.string(),
                foreignField: z.string(),
                as: z.string(),
              })
              .optional(),
            $count: z.string().optional(),
            $addFields: z.record(z.any()).optional(),
            $replaceRoot: z.record(z.any()).optional(),
            $facet: z.record(z.any()).optional(),
            $bucket: z.record(z.any()).optional(),
            $geoNear: z.record(z.any()).optional(),
            $indexStats: z.record(z.any()).optional(),
            $listLocalSessions: z.record(z.any()).optional(),
            $listSessions: z.record(z.any()).optional(),
            $merge: z.record(z.any()).optional(),
            $out: z.string().optional(),
            $planCacheStats: z.record(z.any()).optional(),
            $redact: z.record(z.any()).optional(),
            $replaceWith: z.record(z.any()).optional(),
            $sample: z.object({ size: z.number() }).optional(),
            $search: z.record(z.any()).optional(),
            $searchMeta: z.record(z.any()).optional(),
            $set: z.record(z.any()).optional(),
            $setWindowFields: z.record(z.any()).optional(),
            $unionWith: z.record(z.any()).optional(),
            $unset: z.string().optional(),
          })
          .refine(
            (obj) => {
              // Count the number of defined fields
              const definedFields = Object.keys(obj).filter(
                (key) => obj[key as keyof typeof obj] !== undefined
              );
              return definedFields.length === 1;
            },
            {
              message: "Each pipeline stage must contain exactly one field",
            }
          )
      ),
      options: z.object({}).optional(),
    },
  • src/index.ts:151-237 (registration)
    Full registration of the 'aggregate' tool using this.mcpServer.tool(name, schema, handler), including inline schema and handler.
    this.mcpServer.tool(
      "aggregate",
      {
        collectionName: z.string(),
        pipeline: z.array(
          z
            .object({
              $match: z.record(z.any()).optional(),
              $group: z.record(z.any()).optional(),
              $project: z.record(z.any()).optional(),
              $sort: z.record(z.any()).optional(),
              $limit: z.number().optional(),
              $skip: z.number().optional(),
              $unwind: z.string().optional(),
              $lookup: z
                .object({
                  from: z.string(),
                  localField: z.string(),
                  foreignField: z.string(),
                  as: z.string(),
                })
                .optional(),
              $count: z.string().optional(),
              $addFields: z.record(z.any()).optional(),
              $replaceRoot: z.record(z.any()).optional(),
              $facet: z.record(z.any()).optional(),
              $bucket: z.record(z.any()).optional(),
              $geoNear: z.record(z.any()).optional(),
              $indexStats: z.record(z.any()).optional(),
              $listLocalSessions: z.record(z.any()).optional(),
              $listSessions: z.record(z.any()).optional(),
              $merge: z.record(z.any()).optional(),
              $out: z.string().optional(),
              $planCacheStats: z.record(z.any()).optional(),
              $redact: z.record(z.any()).optional(),
              $replaceWith: z.record(z.any()).optional(),
              $sample: z.object({ size: z.number() }).optional(),
              $search: z.record(z.any()).optional(),
              $searchMeta: z.record(z.any()).optional(),
              $set: z.record(z.any()).optional(),
              $setWindowFields: z.record(z.any()).optional(),
              $unionWith: z.record(z.any()).optional(),
              $unset: z.string().optional(),
            })
            .refine(
              (obj) => {
                // Count the number of defined fields
                const definedFields = Object.keys(obj).filter(
                  (key) => obj[key as keyof typeof obj] !== undefined
                );
                return definedFields.length === 1;
              },
              {
                message: "Each pipeline stage must contain exactly one field",
              }
            )
        ),
        options: z.object({}).optional(),
      },
      async ({ collectionName, pipeline, options }) => {
        try {
          let db = mongodbConnection.getDb();
          if (!db) {
            await mongodbConnection.connect(this.MONGODB_URI);
            db = mongodbConnection.getDb();
            if (!db) throw new Error("Failed to connect to database");
          }
          const collection = db.collection(collectionName);
          const result = await collection
            .aggregate(pipeline, { maxTimeMS: 30000, ...options })
            .toArray();
          return {
            content: [
              {
                type: "text",
                text: JSON.stringify(result),
              },
            ],
          };
        } catch (error) {
          console.error(error);
          return {
            content: [{ type: "text", text: "Error: " + error }],
          };
        }
      }
    );

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/manpreet2000/mcp-database-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server