Skip to main content
Glama
mongodb_mcp.py15.1 kB
#!/usr/bin/env python3 """ MongoDB MCP Server for Claude Desktop Provides MongoDB operations via Model Context Protocol """ import sys import json import asyncio from pathlib import Path sys.path.insert(0, str(Path(__file__).parent.parent)) from mcp.server import Server from mcp.types import Tool, TextContent from mcp.server.stdio import stdio_server import src.mongodb_db as mongo def load_config(): """Load database configuration""" try: # Try multiple possible paths possible_paths = [ Path(__file__).parent.parent / 'config.json', # Standard location Path.cwd() / 'config.json', # Current working directory Path.home() / 'mcplatestv1' / 'config.json', # Alternative location ] for config_path in possible_paths: if config_path.exists(): with open(config_path, 'r') as f: return json.load(f) # If no config file found, return empty dict return {'mongodb': {}} except Exception as e: # Return error info for debugging return {'mongodb': {}, 'error': str(e)} config = load_config() mongo_uri = config.get('mongodb', {}).get('uri', '') server = Server("mongodb-mcp-server") @server.list_tools() async def list_tools(): """List all available MongoDB tools""" return [ Tool( name="list_databases", description="List all databases in MongoDB", inputSchema={ "type": "object", "properties": {} } ), Tool( name="list_collections", description="List all collections in a specific database", inputSchema={ "type": "object", "properties": { "database_name": { "type": "string", "description": "Name of the database" } }, "required": ["database_name"] } ), Tool( name="find_documents", description="Find documents in a collection. Supports query, limit, skip, and sort options.", inputSchema={ "type": "object", "properties": { "database_name": {"type": "string", "description": "Database name"}, "collection_name": {"type": "string", "description": "Collection name"}, "query": { "type": "string", "description": "MongoDB query filter (JSON string, e.g., '{\"name\": \"John\"}')" }, "limit": {"type": "integer", "description": "Maximum number of documents to return (default: 100)"}, "skip": {"type": "integer", "description": "Number of documents to skip (default: 0)"}, "sort": { "type": "string", "description": "Sort criteria (JSON string, e.g., '{\"name\": 1}' for ascending)" } }, "required": ["database_name", "collection_name"] } ), Tool( name="insert_document", description="Insert a single document into a collection", inputSchema={ "type": "object", "properties": { "database_name": {"type": "string", "description": "Database name"}, "collection_name": {"type": "string", "description": "Collection name"}, "document": { "type": "string", "description": "Document to insert (JSON string, e.g., '{\"name\": \"John\", \"age\": 30}')" } }, "required": ["database_name", "collection_name", "document"] } ), Tool( name="insert_many_documents", description="Insert multiple documents into a collection", inputSchema={ "type": "object", "properties": { "database_name": {"type": "string", "description": "Database name"}, "collection_name": {"type": "string", "description": "Collection name"}, "documents": { "type": "string", "description": "Array of documents to insert (JSON string, e.g., '[{\"name\": \"John\"}, {\"name\": \"Jane\"}]')" } }, "required": ["database_name", "collection_name", "documents"] } ), Tool( name="update_document", description="Update a single document in a collection", inputSchema={ "type": "object", "properties": { "database_name": {"type": "string", "description": "Database name"}, "collection_name": {"type": "string", "description": "Collection name"}, "filter_query": { "type": "string", "description": "Filter query to find document (JSON string, e.g., '{\"_id\": \"...\"}')" }, "update_data": { "type": "string", "description": "Update data (JSON string, e.g., '{\"name\": \"John Updated\"}')" }, "upsert": { "type": "boolean", "description": "Create document if it doesn't exist (default: false)" } }, "required": ["database_name", "collection_name", "filter_query", "update_data"] } ), Tool( name="update_many_documents", description="Update multiple documents in a collection", inputSchema={ "type": "object", "properties": { "database_name": {"type": "string", "description": "Database name"}, "collection_name": {"type": "string", "description": "Collection name"}, "filter_query": { "type": "string", "description": "Filter query to find documents" }, "update_data": { "type": "string", "description": "Update data for matching documents" }, "upsert": {"type": "boolean", "description": "Create documents if they don't exist"} }, "required": ["database_name", "collection_name", "filter_query", "update_data"] } ), Tool( name="delete_document", description="Delete a single document from a collection", inputSchema={ "type": "object", "properties": { "database_name": {"type": "string", "description": "Database name"}, "collection_name": {"type": "string", "description": "Collection name"}, "filter_query": { "type": "string", "description": "Filter query to find document to delete" } }, "required": ["database_name", "collection_name", "filter_query"] } ), Tool( name="delete_many_documents", description="Delete multiple documents from a collection", inputSchema={ "type": "object", "properties": { "database_name": {"type": "string", "description": "Database name"}, "collection_name": {"type": "string", "description": "Collection name"}, "filter_query": { "type": "string", "description": "Filter query to find documents to delete" } }, "required": ["database_name", "collection_name", "filter_query"] } ), Tool( name="count_documents", description="Count documents in a collection matching a query", inputSchema={ "type": "object", "properties": { "database_name": {"type": "string", "description": "Database name"}, "collection_name": {"type": "string", "description": "Collection name"}, "query": { "type": "string", "description": "Query filter (JSON string, optional)" } }, "required": ["database_name", "collection_name"] } ), Tool( name="aggregate", description="Run MongoDB aggregation pipeline", inputSchema={ "type": "object", "properties": { "database_name": {"type": "string", "description": "Database name"}, "collection_name": {"type": "string", "description": "Collection name"}, "pipeline": { "type": "string", "description": "Aggregation pipeline (JSON array string, e.g., '[{\"$match\": {...}}, {\"$group\": {...}}]')" } }, "required": ["database_name", "collection_name", "pipeline"] } ) ] @server.call_tool() async def call_tool(name: str, arguments: dict): """Handle tool execution""" try: # Reload config on each call to ensure we have the latest current_config = load_config() current_mongo_uri = current_config.get('mongodb', {}).get('uri', '') if not current_mongo_uri: # Try to provide helpful error message config_path = Path(__file__).parent.parent / 'config.json' error_msg = { "success": False, "error": "MongoDB URI not configured", "details": "Please add 'mongodb.uri' to config.json", "config_location": str(config_path), "config_exists": config_path.exists(), "config_keys": list(current_config.keys()) } if 'error' in current_config: error_msg["config_error"] = current_config['error'] return [TextContent(type="text", text=json.dumps(error_msg, indent=2))] result = None if name == "list_databases": result = mongo.list_databases(current_mongo_uri) elif name == "list_collections": database_name = arguments.get("database_name") result = mongo.list_collections(current_mongo_uri, database_name) elif name == "find_documents": database_name = arguments.get("database_name") collection_name = arguments.get("collection_name") query = arguments.get("query") limit = arguments.get("limit", 100) skip = arguments.get("skip", 0) sort = arguments.get("sort") result = mongo.find_documents(current_mongo_uri, database_name, collection_name, query, limit, skip, sort) elif name == "insert_document": database_name = arguments.get("database_name") collection_name = arguments.get("collection_name") document = arguments.get("document") result = mongo.insert_document(current_mongo_uri, database_name, collection_name, document) elif name == "insert_many_documents": database_name = arguments.get("database_name") collection_name = arguments.get("collection_name") documents = arguments.get("documents") result = mongo.insert_many_documents(current_mongo_uri, database_name, collection_name, documents) elif name == "update_document": database_name = arguments.get("database_name") collection_name = arguments.get("collection_name") filter_query = arguments.get("filter_query") update_data = arguments.get("update_data") upsert = arguments.get("upsert", False) result = mongo.update_document(current_mongo_uri, database_name, collection_name, filter_query, update_data, upsert) elif name == "update_many_documents": database_name = arguments.get("database_name") collection_name = arguments.get("collection_name") filter_query = arguments.get("filter_query") update_data = arguments.get("update_data") upsert = arguments.get("upsert", False) result = mongo.update_many_documents(current_mongo_uri, database_name, collection_name, filter_query, update_data, upsert) elif name == "delete_document": database_name = arguments.get("database_name") collection_name = arguments.get("collection_name") filter_query = arguments.get("filter_query") result = mongo.delete_document(current_mongo_uri, database_name, collection_name, filter_query) elif name == "delete_many_documents": database_name = arguments.get("database_name") collection_name = arguments.get("collection_name") filter_query = arguments.get("filter_query") result = mongo.delete_many_documents(current_mongo_uri, database_name, collection_name, filter_query) elif name == "count_documents": database_name = arguments.get("database_name") collection_name = arguments.get("collection_name") query = arguments.get("query") result = mongo.count_documents(current_mongo_uri, database_name, collection_name, query) elif name == "aggregate": database_name = arguments.get("database_name") collection_name = arguments.get("collection_name") pipeline = arguments.get("pipeline") result = mongo.aggregate(current_mongo_uri, database_name, collection_name, pipeline) else: result = {"success": False, "error": f"Unknown tool: {name}"} return [TextContent(type="text", text=json.dumps(result, indent=2, default=str))] except Exception as error: return [TextContent( type="text", text=json.dumps({"success": False, "error": str(error)}, indent=2) )] async def main(): """Main entry point for MCP server""" async with stdio_server() as (read_stream, write_stream): await server.run( read_stream, write_stream, server.create_initialization_options() ) if __name__ == "__main__": asyncio.run(main())

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/pbulbule13/google-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server