#!/usr/bin/env python3
"""
PostgreSQL MCP Server for Claude Desktop
Provides database operations via Model Context Protocol
"""
import sys
import json
import asyncio
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent))
from mcp.server import Server
from mcp.types import Tool, TextContent
from mcp.server.stdio import stdio_server
import src.postgres_db as db
def load_config():
"""Load database configuration"""
try:
# Try multiple possible paths
possible_paths = [
Path(__file__).parent.parent / 'config.json', # Standard location
Path.cwd() / 'config.json', # Current working directory
Path.home() / 'mcplatestv1' / 'config.json', # Alternative location
]
for config_path in possible_paths:
if config_path.exists():
with open(config_path, 'r') as f:
return json.load(f)
# If no config file found, return empty dict
return {'database': {}}
except Exception as e:
# Return error info for debugging
return {'database': {}, 'error': str(e)}
config = load_config()
db_url = config.get('database', {}).get('url', '')
server = Server("postgresql-mcp-server")
@server.list_tools()
async def list_tools():
"""List all available database tools"""
return [
Tool(
name="execute_query",
description="Execute a SELECT query and return results. Use for reading data from tables.",
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "SQL SELECT query to execute"
},
"params": {
"type": "array",
"description": "Optional query parameters for parameterized queries",
"items": {"type": "string"}
}
},
"required": ["query"]
}
),
Tool(
name="execute_write",
description="Execute INSERT, UPDATE, or DELETE queries. Use for modifying data.",
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "SQL INSERT, UPDATE, or DELETE query"
},
"params": {
"type": "array",
"description": "Optional query parameters",
"items": {"type": "string"}
}
},
"required": ["query"]
}
),
Tool(
name="run_custom_sql",
description="Execute any SQL query (SELECT, INSERT, UPDATE, DELETE, etc.). Automatically handles query type.",
inputSchema={
"type": "object",
"properties": {
"sql": {
"type": "string",
"description": "Any SQL query to execute"
},
"params": {
"type": "array",
"description": "Optional query parameters",
"items": {"type": "string"}
}
},
"required": ["sql"]
}
),
Tool(
name="list_tables",
description="List all tables in the database with their schemas",
inputSchema={
"type": "object",
"properties": {}
}
),
Tool(
name="describe_table",
description="Get detailed schema information for a specific table including columns, types, and constraints",
inputSchema={
"type": "object",
"properties": {
"table_name": {
"type": "string",
"description": "Name of the table to describe"
}
},
"required": ["table_name"]
}
),
Tool(
name="get_table_count",
description="Get the total number of rows in a table",
inputSchema={
"type": "object",
"properties": {
"table_name": {
"type": "string",
"description": "Name of the table"
}
},
"required": ["table_name"]
}
)
]
@server.call_tool()
async def call_tool(name: str, arguments: dict):
"""Handle tool execution"""
try:
# Reload config on each call to ensure we have the latest
current_config = load_config()
current_db_url = current_config.get('database', {}).get('url', '')
if not current_db_url:
# Try to provide helpful error message
config_path = Path(__file__).parent.parent / 'config.json'
error_msg = {
"success": False,
"error": "Database URL not configured",
"details": "Please add 'database.url' to config.json",
"config_location": str(config_path),
"config_exists": config_path.exists(),
"config_keys": list(current_config.keys())
}
if 'error' in current_config:
error_msg["config_error"] = current_config['error']
return [TextContent(type="text", text=json.dumps(error_msg, indent=2))]
result = None
if name == "execute_query":
query = arguments.get("query")
params = arguments.get("params")
result = db.execute_query(current_db_url, query, params)
elif name == "execute_write":
query = arguments.get("query")
params = arguments.get("params")
result = db.execute_write(current_db_url, query, params)
elif name == "run_custom_sql":
sql = arguments.get("sql")
params = arguments.get("params")
result = db.run_custom_sql(current_db_url, sql, params)
elif name == "list_tables":
result = db.list_tables(current_db_url)
elif name == "describe_table":
table_name = arguments.get("table_name")
result = db.describe_table(current_db_url, table_name)
elif name == "get_table_count":
table_name = arguments.get("table_name")
result = db.get_table_count(current_db_url, table_name)
else:
result = {"success": False, "error": f"Unknown tool: {name}"}
return [TextContent(type="text", text=json.dumps(result, indent=2, default=str))]
except Exception as error:
return [TextContent(
type="text",
text=json.dumps({"success": False, "error": str(error)}, indent=2)
)]
async def main():
"""Main entry point for MCP server"""
async with stdio_server() as (read_stream, write_stream):
await server.run(
read_stream,
write_stream,
server.create_initialization_options()
)
if __name__ == "__main__":
asyncio.run(main())