server.py•27.3 kB
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from typing import Optional, Dict, Any, List, Union
import httpx
import json
import logging
# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = FastAPI(
title="Supabase MCP Server",
description="Full-featured MCP server for Supabase with CRUD, SQL, and RPC support",
version="2.0.0"
)
# ============================================================================
# MODELS
# ============================================================================
class SupabaseCredentials(BaseModel):
url: str
anon_key: Optional[str] = None
service_key: Optional[str] = None
class QueryRequest(BaseModel):
credentials: SupabaseCredentials
table: str
select: str = "*"
filters: Optional[Dict[str, Any]] = None
limit: Optional[int] = None
offset: Optional[int] = None
order: Optional[str] = None
class InsertRequest(BaseModel):
credentials: SupabaseCredentials
table: str
data: Union[Dict[str, Any], List[Dict[str, Any]]]
class UpdateRequest(BaseModel):
credentials: SupabaseCredentials
table: str
data: Dict[str, Any]
filters: Dict[str, Any]
class DeleteRequest(BaseModel):
credentials: SupabaseCredentials
table: str
filters: Dict[str, Any]
class UpsertRequest(BaseModel):
credentials: SupabaseCredentials
table: str
data: Union[Dict[str, Any], List[Dict[str, Any]]]
on_conflict: Optional[str] = None
class SQLRequest(BaseModel):
credentials: SupabaseCredentials
query: str
params: Optional[Dict[str, Any]] = None
class RPCRequest(BaseModel):
credentials: SupabaseCredentials
function_name: str
params: Optional[Dict[str, Any]] = None
class StorageUploadRequest(BaseModel):
credentials: SupabaseCredentials
bucket: str
path: str
file_data: str # base64 encoded
content_type: Optional[str] = "application/octet-stream"
class StorageListRequest(BaseModel):
credentials: SupabaseCredentials
bucket: str
path: Optional[str] = ""
# ============================================================================
# HELPER FUNCTIONS
# ============================================================================
def get_api_key(creds: SupabaseCredentials) -> str:
"""Get the appropriate API key (prefer service_key, fallback to anon_key)"""
return creds.service_key or creds.anon_key
def build_filter_string(filters: Dict[str, Any]) -> str:
"""Convert filters dict to Supabase query string format
Examples:
{"id": "1"} → "id=eq.1"
{"id": {"eq": "1"}} → "id=eq.1"
{"age": {"gt": "18"}} → "age=gt.18"
{"name": {"like": "*John*"}} → "name=like.*John*"
{"status": {"in": "(active,pending)"}} → "status=in.(active,pending)"
"""
if not filters:
return ""
filter_parts = []
for key, value in filters.items():
if isinstance(value, dict):
# Format: {"column": {"operator": "value"}}
for op, val in value.items():
filter_parts.append(f"{key}={op}.{val}")
else:
# Simple format: {"column": "value"} - default to eq
filter_parts.append(f"{key}=eq.{value}")
return "&" + "&".join(filter_parts) if filter_parts else ""
async def supabase_request(
method: str,
url: str,
api_key: str,
data: Optional[Any] = None,
params: Optional[Dict] = None,
headers_override: Optional[Dict] = None
) -> Dict[str, Any]:
"""Make request to Supabase REST API"""
headers = {
"apikey": api_key,
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
"Prefer": "return=representation"
}
if headers_override:
headers.update(headers_override)
async with httpx.AsyncClient(timeout=30.0) as client:
try:
logger.info(f"{method} {url}")
if method == "GET":
response = await client.get(url, headers=headers, params=params)
elif method == "POST":
response = await client.post(url, headers=headers, json=data)
elif method == "PATCH":
response = await client.patch(url, headers=headers, json=data)
elif method == "DELETE":
response = await client.delete(url, headers=headers)
else:
return {"error": f"Unsupported method: {method}"}
logger.info(f"Response status: {response.status_code}")
if response.status_code >= 400:
error_detail = response.text
try:
error_detail = response.json()
except:
pass
return {"error": error_detail, "status_code": response.status_code}
try:
return {"data": response.json(), "status": response.status_code}
except:
return {"data": response.text, "status": response.status_code}
except Exception as e:
logger.error(f"Request failed: {str(e)}")
return {"error": str(e)}
# ============================================================================
# MCP PROTOCOL ENDPOINTS
# ============================================================================
@app.post("/mcp/tools/list")
async def list_tools():
"""List all available MCP tools"""
return {
"jsonrpc": "2.0",
"id": 1,
"result": {
"tools": [
{
"name": "supabase_query",
"description": "Query/Select data from a Supabase table with filters, pagination, and ordering",
"inputSchema": {
"type": "object",
"properties": {
"credentials": {
"type": "object",
"properties": {
"url": {"type": "string", "description": "Supabase project URL"},
"anon_key": {"type": "string", "description": "Anon/public API key"},
"service_key": {"type": "string", "description": "Service role key (optional, for admin access)"}
},
"required": ["url"]
},
"table": {"type": "string", "description": "Table name"},
"select": {"type": "string", "default": "*", "description": "Columns to select"},
"filters": {"type": "object", "description": "Filter conditions (e.g., {'id': '1'} or {'age': {'gt': '18'}})"},
"limit": {"type": "integer", "description": "Max rows to return"},
"offset": {"type": "integer", "description": "Rows to skip"},
"order": {"type": "string", "description": "Order by column (e.g., 'created_at.desc')"}
},
"required": ["credentials", "table"]
}
},
{
"name": "supabase_insert",
"description": "Insert new row(s) into a Supabase table",
"inputSchema": {
"type": "object",
"properties": {
"credentials": {
"type": "object",
"properties": {
"url": {"type": "string"},
"anon_key": {"type": "string"},
"service_key": {"type": "string"}
},
"required": ["url"]
},
"table": {"type": "string"},
"data": {"type": ["object", "array"], "description": "Single object or array of objects to insert"}
},
"required": ["credentials", "table", "data"]
}
},
{
"name": "supabase_update",
"description": "Update existing row(s) in a Supabase table",
"inputSchema": {
"type": "object",
"properties": {
"credentials": {
"type": "object",
"properties": {
"url": {"type": "string"},
"anon_key": {"type": "string"},
"service_key": {"type": "string"}
},
"required": ["url"]
},
"table": {"type": "string"},
"data": {"type": "object", "description": "Data to update"},
"filters": {"type": "object", "description": "Which rows to update"}
},
"required": ["credentials", "table", "data", "filters"]
}
},
{
"name": "supabase_delete",
"description": "Delete row(s) from a Supabase table",
"inputSchema": {
"type": "object",
"properties": {
"credentials": {
"type": "object",
"properties": {
"url": {"type": "string"},
"anon_key": {"type": "string"},
"service_key": {"type": "string"}
},
"required": ["url"]
},
"table": {"type": "string"},
"filters": {"type": "object", "description": "Which rows to delete"}
},
"required": ["credentials", "table", "filters"]
}
},
{
"name": "supabase_upsert",
"description": "Insert or update row(s) - insert new or update if exists",
"inputSchema": {
"type": "object",
"properties": {
"credentials": {
"type": "object",
"properties": {
"url": {"type": "string"},
"anon_key": {"type": "string"},
"service_key": {"type": "string"}
},
"required": ["url"]
},
"table": {"type": "string"},
"data": {"type": ["object", "array"]},
"on_conflict": {"type": "string", "description": "Column(s) to check for conflicts"}
},
"required": ["credentials", "table", "data"]
}
},
{
"name": "supabase_sql",
"description": "Execute raw SQL query (requires service_key)",
"inputSchema": {
"type": "object",
"properties": {
"credentials": {
"type": "object",
"properties": {
"url": {"type": "string"},
"service_key": {"type": "string"}
},
"required": ["url", "service_key"]
},
"query": {"type": "string", "description": "SQL query to execute"},
"params": {"type": "object", "description": "Query parameters (optional)"}
},
"required": ["credentials", "query"]
}
},
{
"name": "supabase_rpc",
"description": "Call a Supabase RPC function (stored procedure)",
"inputSchema": {
"type": "object",
"properties": {
"credentials": {
"type": "object",
"properties": {
"url": {"type": "string"},
"anon_key": {"type": "string"},
"service_key": {"type": "string"}
},
"required": ["url"]
},
"function_name": {"type": "string", "description": "Name of the RPC function"},
"params": {"type": "object", "description": "Function parameters"}
},
"required": ["credentials", "function_name"]
}
},
{
"name": "supabase_storage_upload",
"description": "Upload file to Supabase Storage",
"inputSchema": {
"type": "object",
"properties": {
"credentials": {
"type": "object",
"properties": {
"url": {"type": "string"},
"anon_key": {"type": "string"},
"service_key": {"type": "string"}
},
"required": ["url"]
},
"bucket": {"type": "string", "description": "Storage bucket name"},
"path": {"type": "string", "description": "File path in bucket"},
"file_data": {"type": "string", "description": "Base64 encoded file data"},
"content_type": {"type": "string", "default": "application/octet-stream"}
},
"required": ["credentials", "bucket", "path", "file_data"]
}
},
{
"name": "supabase_storage_list",
"description": "List files in Supabase Storage bucket",
"inputSchema": {
"type": "object",
"properties": {
"credentials": {
"type": "object",
"properties": {
"url": {"type": "string"},
"anon_key": {"type": "string"},
"service_key": {"type": "string"}
},
"required": ["url"]
},
"bucket": {"type": "string"},
"path": {"type": "string", "default": ""}
},
"required": ["credentials", "bucket"]
}
}
]
}
}
@app.post("/mcp/tools/call")
async def call_tool(request: Request):
"""Execute an MCP tool"""
body = await request.json()
tool_name = body.get("params", {}).get("name")
arguments = body.get("params", {}).get("arguments", {})
try:
# Route to appropriate handler
handlers = {
"supabase_query": handle_query,
"supabase_insert": handle_insert,
"supabase_update": handle_update,
"supabase_delete": handle_delete,
"supabase_upsert": handle_upsert,
"supabase_sql": handle_sql,
"supabase_rpc": handle_rpc,
"supabase_storage_upload": handle_storage_upload,
"supabase_storage_list": handle_storage_list
}
handler = handlers.get(tool_name)
if not handler:
return {
"jsonrpc": "2.0",
"id": body.get("id", 1),
"error": {"code": 404, "message": f"Tool not found: {tool_name}"}
}
result = await handler(arguments)
if "error" in result:
return {
"jsonrpc": "2.0",
"id": body.get("id", 1),
"error": {"code": result.get("code", 500), "message": result["error"]}
}
return {
"jsonrpc": "2.0",
"id": body.get("id", 1),
"result": result
}
except Exception as e:
logger.error(f"Tool execution failed: {str(e)}")
return {
"jsonrpc": "2.0",
"id": body.get("id", 1),
"error": {"code": 500, "message": str(e)}
}
# ============================================================================
# TOOL HANDLERS
# ============================================================================
async def handle_query(args: Dict[str, Any]) -> Dict[str, Any]:
"""Handle supabase_query tool"""
creds = SupabaseCredentials(**args["credentials"])
table = args["table"]
select = args.get("select", "*")
filters = args.get("filters")
limit = args.get("limit")
offset = args.get("offset")
order = args.get("order")
# Build URL
url = f"{creds.url}/rest/v1/{table}?select={select}"
if filters:
url += build_filter_string(filters)
if limit:
url += f"&limit={limit}"
if offset:
url += f"&offset={offset}"
if order:
url += f"&order={order}"
api_key = get_api_key(creds)
result = await supabase_request("GET", url, api_key)
if "error" in result:
return {"error": f"Supabase error: {json.dumps(result['error'])}", "code": result.get("status_code", 500)}
return {"data": result["data"], "count": len(result["data"]) if isinstance(result["data"], list) else 1}
async def handle_insert(args: Dict[str, Any]) -> Dict[str, Any]:
"""Handle supabase_insert tool"""
creds = SupabaseCredentials(**args["credentials"])
table = args["table"]
data = args["data"]
url = f"{creds.url}/rest/v1/{table}"
api_key = get_api_key(creds)
result = await supabase_request("POST", url, api_key, data=data)
if "error" in result:
return {"error": f"Supabase error: {json.dumps(result['error'])}", "code": result.get("status_code", 400)}
return {"data": result["data"], "inserted": len(result["data"]) if isinstance(result["data"], list) else 1}
async def handle_update(args: Dict[str, Any]) -> Dict[str, Any]:
"""Handle supabase_update tool"""
creds = SupabaseCredentials(**args["credentials"])
table = args["table"]
data = args["data"]
filters = args["filters"]
url = f"{creds.url}/rest/v1/{table}"
url += build_filter_string(filters)
api_key = get_api_key(creds)
result = await supabase_request("PATCH", url, api_key, data=data)
if "error" in result:
return {"error": f"Supabase error: {json.dumps(result['error'])}", "code": result.get("status_code", 400)}
return {"data": result["data"], "updated": len(result["data"]) if isinstance(result["data"], list) else 1}
async def handle_delete(args: Dict[str, Any]) -> Dict[str, Any]:
"""Handle supabase_delete tool"""
creds = SupabaseCredentials(**args["credentials"])
table = args["table"]
filters = args["filters"]
url = f"{creds.url}/rest/v1/{table}"
url += build_filter_string(filters)
api_key = get_api_key(creds)
result = await supabase_request("DELETE", url, api_key)
if "error" in result:
return {"error": f"Supabase error: {json.dumps(result['error'])}", "code": result.get("status_code", 400)}
return {"data": result.get("data", []), "deleted": len(result.get("data", [])) if result.get("data") else 0}
async def handle_upsert(args: Dict[str, Any]) -> Dict[str, Any]:
"""Handle supabase_upsert tool"""
creds = SupabaseCredentials(**args["credentials"])
table = args["table"]
data = args["data"]
on_conflict = args.get("on_conflict")
url = f"{creds.url}/rest/v1/{table}"
headers_override = {"Prefer": "resolution=merge-duplicates"}
if on_conflict:
headers_override["Prefer"] += f",on-conflict={on_conflict}"
api_key = get_api_key(creds)
result = await supabase_request("POST", url, api_key, data=data, headers_override=headers_override)
if "error" in result:
return {"error": f"Supabase error: {json.dumps(result['error'])}", "code": result.get("status_code", 400)}
return {"data": result["data"], "upserted": len(result["data"]) if isinstance(result["data"], list) else 1}
async def handle_sql(args: Dict[str, Any]) -> Dict[str, Any]:
"""Handle supabase_sql tool - execute raw SQL"""
creds = SupabaseCredentials(**args["credentials"])
query = args["query"]
params = args.get("params", {})
if not creds.service_key:
return {"error": "SQL queries require service_key", "code": 403}
# Use PostgREST's rpc endpoint or direct SQL execution
# Note: Direct SQL might not be available via REST API, depends on setup
url = f"{creds.url}/rest/v1/rpc/exec_sql"
data = {"query": query}
if params:
data["params"] = params
result = await supabase_request("POST", url, creds.service_key, data=data)
if "error" in result:
return {"error": f"SQL execution failed: {json.dumps(result['error'])}", "code": result.get("status_code", 500)}
return {"data": result["data"]}
async def handle_rpc(args: Dict[str, Any]) -> Dict[str, Any]:
"""Handle supabase_rpc tool - call stored procedures"""
creds = SupabaseCredentials(**args["credentials"])
function_name = args["function_name"]
params = args.get("params", {})
url = f"{creds.url}/rest/v1/rpc/{function_name}"
api_key = get_api_key(creds)
result = await supabase_request("POST", url, api_key, data=params)
if "error" in result:
return {"error": f"RPC call failed: {json.dumps(result['error'])}", "code": result.get("status_code", 400)}
return {"data": result["data"]}
async def handle_storage_upload(args: Dict[str, Any]) -> Dict[str, Any]:
"""Handle supabase_storage_upload tool"""
creds = SupabaseCredentials(**args["credentials"])
bucket = args["bucket"]
path = args["path"]
file_data = args["file_data"]
content_type = args.get("content_type", "application/octet-stream")
import base64
file_bytes = base64.b64decode(file_data)
url = f"{creds.url}/storage/v1/object/{bucket}/{path}"
api_key = get_api_key(creds)
headers = {
"apikey": api_key,
"Authorization": f"Bearer {api_key}",
"Content-Type": content_type
}
async with httpx.AsyncClient() as client:
response = await client.post(url, headers=headers, content=file_bytes)
if response.status_code >= 400:
return {"error": response.text, "code": response.status_code}
return {"data": response.json(), "status": "uploaded"}
async def handle_storage_list(args: Dict[str, Any]) -> Dict[str, Any]:
"""Handle supabase_storage_list tool"""
creds = SupabaseCredentials(**args["credentials"])
bucket = args["bucket"]
path = args.get("path", "")
url = f"{creds.url}/storage/v1/object/list/{bucket}"
if path:
url += f"?prefix={path}"
api_key = get_api_key(creds)
result = await supabase_request("GET", url, api_key)
if "error" in result:
return {"error": f"Storage list failed: {json.dumps(result['error'])}", "code": result.get("status_code", 400)}
return {"data": result["data"], "count": len(result["data"])}
# ============================================================================
# LEGACY REST API ENDPOINTS (backward compatibility)
# ============================================================================
@app.post("/api/query")
async def api_query(request: QueryRequest):
args = request.dict()
result = await handle_query(args)
if "error" in result:
return JSONResponse(status_code=result.get("code", 500), content=result)
return result
@app.post("/api/insert")
async def api_insert(request: InsertRequest):
args = request.dict()
result = await handle_insert(args)
if "error" in result:
return JSONResponse(status_code=result.get("code", 500), content=result)
return result
@app.post("/api/update")
async def api_update(request: UpdateRequest):
args = request.dict()
result = await handle_update(args)
if "error" in result:
return JSONResponse(status_code=result.get("code", 500), content=result)
return result
@app.post("/api/delete")
async def api_delete(request: DeleteRequest):
args = request.dict()
result = await handle_delete(args)
if "error" in result:
return JSONResponse(status_code=result.get("code", 500), content=result)
return result
@app.post("/api/rpc")
async def api_rpc(request: RPCRequest):
args = request.dict()
result = await handle_rpc(args)
if "error" in result:
return JSONResponse(status_code=result.get("code", 500), content=result)
return result
# ============================================================================
# INFO ENDPOINTS
# ============================================================================
@app.get("/")
async def root():
return {
"name": "Supabase MCP Server",
"version": "2.0.0",
"description": "Full-featured MCP server for Supabase",
"features": [
"CRUD operations (Query, Insert, Update, Delete, Upsert)",
"Raw SQL queries",
"RPC function calls",
"Storage management",
"Compatible with Claude Desktop and n8n",
"Works with any Supabase project"
],
"endpoints": {
"mcp": {
"list_tools": "POST /mcp/tools/list",
"call_tool": "POST /mcp/tools/call"
},
"rest_api": {
"query": "POST /api/query",
"insert": "POST /api/insert",
"update": "POST /api/update",
"delete": "POST /api/delete",
"rpc": "POST /api/rpc"
}
}
}
@app.get("/health")
async def health():
return {"status": "healthy", "version": "2.0.0"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)