#!/usr/bin/env python3
"""
MongoDB MCP Server for Claude Desktop
Provides MongoDB operations via Model Context Protocol
"""
import sys
import json
import asyncio
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent))
from mcp.server import Server
from mcp.types import Tool, TextContent
from mcp.server.stdio import stdio_server
import src.mongodb_db as mongo
def load_config():
"""Load database configuration"""
try:
# Try multiple possible paths
possible_paths = [
Path(__file__).parent.parent / 'config.json', # Standard location
Path.cwd() / 'config.json', # Current working directory
Path.home() / 'mcplatestv1' / 'config.json', # Alternative location
]
for config_path in possible_paths:
if config_path.exists():
with open(config_path, 'r') as f:
return json.load(f)
# If no config file found, return empty dict
return {'mongodb': {}}
except Exception as e:
# Return error info for debugging
return {'mongodb': {}, 'error': str(e)}
config = load_config()
mongo_uri = config.get('mongodb', {}).get('uri', '')
server = Server("mongodb-mcp-server")
@server.list_tools()
async def list_tools():
"""List all available MongoDB tools"""
return [
Tool(
name="list_databases",
description="List all databases in MongoDB",
inputSchema={
"type": "object",
"properties": {}
}
),
Tool(
name="list_collections",
description="List all collections in a specific database",
inputSchema={
"type": "object",
"properties": {
"database_name": {
"type": "string",
"description": "Name of the database"
}
},
"required": ["database_name"]
}
),
Tool(
name="find_documents",
description="Find documents in a collection. Supports query, limit, skip, and sort options.",
inputSchema={
"type": "object",
"properties": {
"database_name": {"type": "string", "description": "Database name"},
"collection_name": {"type": "string", "description": "Collection name"},
"query": {
"type": "string",
"description": "MongoDB query filter (JSON string, e.g., '{\"name\": \"John\"}')"
},
"limit": {"type": "integer", "description": "Maximum number of documents to return (default: 100)"},
"skip": {"type": "integer", "description": "Number of documents to skip (default: 0)"},
"sort": {
"type": "string",
"description": "Sort criteria (JSON string, e.g., '{\"name\": 1}' for ascending)"
}
},
"required": ["database_name", "collection_name"]
}
),
Tool(
name="insert_document",
description="Insert a single document into a collection",
inputSchema={
"type": "object",
"properties": {
"database_name": {"type": "string", "description": "Database name"},
"collection_name": {"type": "string", "description": "Collection name"},
"document": {
"type": "string",
"description": "Document to insert (JSON string, e.g., '{\"name\": \"John\", \"age\": 30}')"
}
},
"required": ["database_name", "collection_name", "document"]
}
),
Tool(
name="insert_many_documents",
description="Insert multiple documents into a collection",
inputSchema={
"type": "object",
"properties": {
"database_name": {"type": "string", "description": "Database name"},
"collection_name": {"type": "string", "description": "Collection name"},
"documents": {
"type": "string",
"description": "Array of documents to insert (JSON string, e.g., '[{\"name\": \"John\"}, {\"name\": \"Jane\"}]')"
}
},
"required": ["database_name", "collection_name", "documents"]
}
),
Tool(
name="update_document",
description="Update a single document in a collection",
inputSchema={
"type": "object",
"properties": {
"database_name": {"type": "string", "description": "Database name"},
"collection_name": {"type": "string", "description": "Collection name"},
"filter_query": {
"type": "string",
"description": "Filter query to find document (JSON string, e.g., '{\"_id\": \"...\"}')"
},
"update_data": {
"type": "string",
"description": "Update data (JSON string, e.g., '{\"name\": \"John Updated\"}')"
},
"upsert": {
"type": "boolean",
"description": "Create document if it doesn't exist (default: false)"
}
},
"required": ["database_name", "collection_name", "filter_query", "update_data"]
}
),
Tool(
name="update_many_documents",
description="Update multiple documents in a collection",
inputSchema={
"type": "object",
"properties": {
"database_name": {"type": "string", "description": "Database name"},
"collection_name": {"type": "string", "description": "Collection name"},
"filter_query": {
"type": "string",
"description": "Filter query to find documents"
},
"update_data": {
"type": "string",
"description": "Update data for matching documents"
},
"upsert": {"type": "boolean", "description": "Create documents if they don't exist"}
},
"required": ["database_name", "collection_name", "filter_query", "update_data"]
}
),
Tool(
name="delete_document",
description="Delete a single document from a collection",
inputSchema={
"type": "object",
"properties": {
"database_name": {"type": "string", "description": "Database name"},
"collection_name": {"type": "string", "description": "Collection name"},
"filter_query": {
"type": "string",
"description": "Filter query to find document to delete"
}
},
"required": ["database_name", "collection_name", "filter_query"]
}
),
Tool(
name="delete_many_documents",
description="Delete multiple documents from a collection",
inputSchema={
"type": "object",
"properties": {
"database_name": {"type": "string", "description": "Database name"},
"collection_name": {"type": "string", "description": "Collection name"},
"filter_query": {
"type": "string",
"description": "Filter query to find documents to delete"
}
},
"required": ["database_name", "collection_name", "filter_query"]
}
),
Tool(
name="count_documents",
description="Count documents in a collection matching a query",
inputSchema={
"type": "object",
"properties": {
"database_name": {"type": "string", "description": "Database name"},
"collection_name": {"type": "string", "description": "Collection name"},
"query": {
"type": "string",
"description": "Query filter (JSON string, optional)"
}
},
"required": ["database_name", "collection_name"]
}
),
Tool(
name="aggregate",
description="Run MongoDB aggregation pipeline",
inputSchema={
"type": "object",
"properties": {
"database_name": {"type": "string", "description": "Database name"},
"collection_name": {"type": "string", "description": "Collection name"},
"pipeline": {
"type": "string",
"description": "Aggregation pipeline (JSON array string, e.g., '[{\"$match\": {...}}, {\"$group\": {...}}]')"
}
},
"required": ["database_name", "collection_name", "pipeline"]
}
)
]
@server.call_tool()
async def call_tool(name: str, arguments: dict):
"""Handle tool execution"""
try:
# Reload config on each call to ensure we have the latest
current_config = load_config()
current_mongo_uri = current_config.get('mongodb', {}).get('uri', '')
if not current_mongo_uri:
# Try to provide helpful error message
config_path = Path(__file__).parent.parent / 'config.json'
error_msg = {
"success": False,
"error": "MongoDB URI not configured",
"details": "Please add 'mongodb.uri' to config.json",
"config_location": str(config_path),
"config_exists": config_path.exists(),
"config_keys": list(current_config.keys())
}
if 'error' in current_config:
error_msg["config_error"] = current_config['error']
return [TextContent(type="text", text=json.dumps(error_msg, indent=2))]
result = None
if name == "list_databases":
result = mongo.list_databases(current_mongo_uri)
elif name == "list_collections":
database_name = arguments.get("database_name")
result = mongo.list_collections(current_mongo_uri, database_name)
elif name == "find_documents":
database_name = arguments.get("database_name")
collection_name = arguments.get("collection_name")
query = arguments.get("query")
limit = arguments.get("limit", 100)
skip = arguments.get("skip", 0)
sort = arguments.get("sort")
result = mongo.find_documents(current_mongo_uri, database_name, collection_name, query, limit, skip, sort)
elif name == "insert_document":
database_name = arguments.get("database_name")
collection_name = arguments.get("collection_name")
document = arguments.get("document")
result = mongo.insert_document(current_mongo_uri, database_name, collection_name, document)
elif name == "insert_many_documents":
database_name = arguments.get("database_name")
collection_name = arguments.get("collection_name")
documents = arguments.get("documents")
result = mongo.insert_many_documents(current_mongo_uri, database_name, collection_name, documents)
elif name == "update_document":
database_name = arguments.get("database_name")
collection_name = arguments.get("collection_name")
filter_query = arguments.get("filter_query")
update_data = arguments.get("update_data")
upsert = arguments.get("upsert", False)
result = mongo.update_document(current_mongo_uri, database_name, collection_name, filter_query, update_data, upsert)
elif name == "update_many_documents":
database_name = arguments.get("database_name")
collection_name = arguments.get("collection_name")
filter_query = arguments.get("filter_query")
update_data = arguments.get("update_data")
upsert = arguments.get("upsert", False)
result = mongo.update_many_documents(current_mongo_uri, database_name, collection_name, filter_query, update_data, upsert)
elif name == "delete_document":
database_name = arguments.get("database_name")
collection_name = arguments.get("collection_name")
filter_query = arguments.get("filter_query")
result = mongo.delete_document(current_mongo_uri, database_name, collection_name, filter_query)
elif name == "delete_many_documents":
database_name = arguments.get("database_name")
collection_name = arguments.get("collection_name")
filter_query = arguments.get("filter_query")
result = mongo.delete_many_documents(current_mongo_uri, database_name, collection_name, filter_query)
elif name == "count_documents":
database_name = arguments.get("database_name")
collection_name = arguments.get("collection_name")
query = arguments.get("query")
result = mongo.count_documents(current_mongo_uri, database_name, collection_name, query)
elif name == "aggregate":
database_name = arguments.get("database_name")
collection_name = arguments.get("collection_name")
pipeline = arguments.get("pipeline")
result = mongo.aggregate(current_mongo_uri, database_name, collection_name, pipeline)
else:
result = {"success": False, "error": f"Unknown tool: {name}"}
return [TextContent(type="text", text=json.dumps(result, indent=2, default=str))]
except Exception as error:
return [TextContent(
type="text",
text=json.dumps({"success": False, "error": str(error)}, indent=2)
)]
async def main():
"""Main entry point for MCP server"""
async with stdio_server() as (read_stream, write_stream):
await server.run(
read_stream,
write_stream,
server.create_initialization_options()
)
if __name__ == "__main__":
asyncio.run(main())