Skip to main content
Glama
server.py27.3 kB
from fastapi import FastAPI, Request from fastapi.responses import JSONResponse from pydantic import BaseModel from typing import Optional, Dict, Any, List, Union import httpx import json import logging # Setup logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) app = FastAPI( title="Supabase MCP Server", description="Full-featured MCP server for Supabase with CRUD, SQL, and RPC support", version="2.0.0" ) # ============================================================================ # MODELS # ============================================================================ class SupabaseCredentials(BaseModel): url: str anon_key: Optional[str] = None service_key: Optional[str] = None class QueryRequest(BaseModel): credentials: SupabaseCredentials table: str select: str = "*" filters: Optional[Dict[str, Any]] = None limit: Optional[int] = None offset: Optional[int] = None order: Optional[str] = None class InsertRequest(BaseModel): credentials: SupabaseCredentials table: str data: Union[Dict[str, Any], List[Dict[str, Any]]] class UpdateRequest(BaseModel): credentials: SupabaseCredentials table: str data: Dict[str, Any] filters: Dict[str, Any] class DeleteRequest(BaseModel): credentials: SupabaseCredentials table: str filters: Dict[str, Any] class UpsertRequest(BaseModel): credentials: SupabaseCredentials table: str data: Union[Dict[str, Any], List[Dict[str, Any]]] on_conflict: Optional[str] = None class SQLRequest(BaseModel): credentials: SupabaseCredentials query: str params: Optional[Dict[str, Any]] = None class RPCRequest(BaseModel): credentials: SupabaseCredentials function_name: str params: Optional[Dict[str, Any]] = None class StorageUploadRequest(BaseModel): credentials: SupabaseCredentials bucket: str path: str file_data: str # base64 encoded content_type: Optional[str] = "application/octet-stream" class StorageListRequest(BaseModel): credentials: SupabaseCredentials bucket: str path: Optional[str] = "" # ============================================================================ # HELPER FUNCTIONS # ============================================================================ def get_api_key(creds: SupabaseCredentials) -> str: """Get the appropriate API key (prefer service_key, fallback to anon_key)""" return creds.service_key or creds.anon_key def build_filter_string(filters: Dict[str, Any]) -> str: """Convert filters dict to Supabase query string format Examples: {"id": "1"} → "id=eq.1" {"id": {"eq": "1"}} → "id=eq.1" {"age": {"gt": "18"}} → "age=gt.18" {"name": {"like": "*John*"}} → "name=like.*John*" {"status": {"in": "(active,pending)"}} → "status=in.(active,pending)" """ if not filters: return "" filter_parts = [] for key, value in filters.items(): if isinstance(value, dict): # Format: {"column": {"operator": "value"}} for op, val in value.items(): filter_parts.append(f"{key}={op}.{val}") else: # Simple format: {"column": "value"} - default to eq filter_parts.append(f"{key}=eq.{value}") return "&" + "&".join(filter_parts) if filter_parts else "" async def supabase_request( method: str, url: str, api_key: str, data: Optional[Any] = None, params: Optional[Dict] = None, headers_override: Optional[Dict] = None ) -> Dict[str, Any]: """Make request to Supabase REST API""" headers = { "apikey": api_key, "Authorization": f"Bearer {api_key}", "Content-Type": "application/json", "Prefer": "return=representation" } if headers_override: headers.update(headers_override) async with httpx.AsyncClient(timeout=30.0) as client: try: logger.info(f"{method} {url}") if method == "GET": response = await client.get(url, headers=headers, params=params) elif method == "POST": response = await client.post(url, headers=headers, json=data) elif method == "PATCH": response = await client.patch(url, headers=headers, json=data) elif method == "DELETE": response = await client.delete(url, headers=headers) else: return {"error": f"Unsupported method: {method}"} logger.info(f"Response status: {response.status_code}") if response.status_code >= 400: error_detail = response.text try: error_detail = response.json() except: pass return {"error": error_detail, "status_code": response.status_code} try: return {"data": response.json(), "status": response.status_code} except: return {"data": response.text, "status": response.status_code} except Exception as e: logger.error(f"Request failed: {str(e)}") return {"error": str(e)} # ============================================================================ # MCP PROTOCOL ENDPOINTS # ============================================================================ @app.post("/mcp/tools/list") async def list_tools(): """List all available MCP tools""" return { "jsonrpc": "2.0", "id": 1, "result": { "tools": [ { "name": "supabase_query", "description": "Query/Select data from a Supabase table with filters, pagination, and ordering", "inputSchema": { "type": "object", "properties": { "credentials": { "type": "object", "properties": { "url": {"type": "string", "description": "Supabase project URL"}, "anon_key": {"type": "string", "description": "Anon/public API key"}, "service_key": {"type": "string", "description": "Service role key (optional, for admin access)"} }, "required": ["url"] }, "table": {"type": "string", "description": "Table name"}, "select": {"type": "string", "default": "*", "description": "Columns to select"}, "filters": {"type": "object", "description": "Filter conditions (e.g., {'id': '1'} or {'age': {'gt': '18'}})"}, "limit": {"type": "integer", "description": "Max rows to return"}, "offset": {"type": "integer", "description": "Rows to skip"}, "order": {"type": "string", "description": "Order by column (e.g., 'created_at.desc')"} }, "required": ["credentials", "table"] } }, { "name": "supabase_insert", "description": "Insert new row(s) into a Supabase table", "inputSchema": { "type": "object", "properties": { "credentials": { "type": "object", "properties": { "url": {"type": "string"}, "anon_key": {"type": "string"}, "service_key": {"type": "string"} }, "required": ["url"] }, "table": {"type": "string"}, "data": {"type": ["object", "array"], "description": "Single object or array of objects to insert"} }, "required": ["credentials", "table", "data"] } }, { "name": "supabase_update", "description": "Update existing row(s) in a Supabase table", "inputSchema": { "type": "object", "properties": { "credentials": { "type": "object", "properties": { "url": {"type": "string"}, "anon_key": {"type": "string"}, "service_key": {"type": "string"} }, "required": ["url"] }, "table": {"type": "string"}, "data": {"type": "object", "description": "Data to update"}, "filters": {"type": "object", "description": "Which rows to update"} }, "required": ["credentials", "table", "data", "filters"] } }, { "name": "supabase_delete", "description": "Delete row(s) from a Supabase table", "inputSchema": { "type": "object", "properties": { "credentials": { "type": "object", "properties": { "url": {"type": "string"}, "anon_key": {"type": "string"}, "service_key": {"type": "string"} }, "required": ["url"] }, "table": {"type": "string"}, "filters": {"type": "object", "description": "Which rows to delete"} }, "required": ["credentials", "table", "filters"] } }, { "name": "supabase_upsert", "description": "Insert or update row(s) - insert new or update if exists", "inputSchema": { "type": "object", "properties": { "credentials": { "type": "object", "properties": { "url": {"type": "string"}, "anon_key": {"type": "string"}, "service_key": {"type": "string"} }, "required": ["url"] }, "table": {"type": "string"}, "data": {"type": ["object", "array"]}, "on_conflict": {"type": "string", "description": "Column(s) to check for conflicts"} }, "required": ["credentials", "table", "data"] } }, { "name": "supabase_sql", "description": "Execute raw SQL query (requires service_key)", "inputSchema": { "type": "object", "properties": { "credentials": { "type": "object", "properties": { "url": {"type": "string"}, "service_key": {"type": "string"} }, "required": ["url", "service_key"] }, "query": {"type": "string", "description": "SQL query to execute"}, "params": {"type": "object", "description": "Query parameters (optional)"} }, "required": ["credentials", "query"] } }, { "name": "supabase_rpc", "description": "Call a Supabase RPC function (stored procedure)", "inputSchema": { "type": "object", "properties": { "credentials": { "type": "object", "properties": { "url": {"type": "string"}, "anon_key": {"type": "string"}, "service_key": {"type": "string"} }, "required": ["url"] }, "function_name": {"type": "string", "description": "Name of the RPC function"}, "params": {"type": "object", "description": "Function parameters"} }, "required": ["credentials", "function_name"] } }, { "name": "supabase_storage_upload", "description": "Upload file to Supabase Storage", "inputSchema": { "type": "object", "properties": { "credentials": { "type": "object", "properties": { "url": {"type": "string"}, "anon_key": {"type": "string"}, "service_key": {"type": "string"} }, "required": ["url"] }, "bucket": {"type": "string", "description": "Storage bucket name"}, "path": {"type": "string", "description": "File path in bucket"}, "file_data": {"type": "string", "description": "Base64 encoded file data"}, "content_type": {"type": "string", "default": "application/octet-stream"} }, "required": ["credentials", "bucket", "path", "file_data"] } }, { "name": "supabase_storage_list", "description": "List files in Supabase Storage bucket", "inputSchema": { "type": "object", "properties": { "credentials": { "type": "object", "properties": { "url": {"type": "string"}, "anon_key": {"type": "string"}, "service_key": {"type": "string"} }, "required": ["url"] }, "bucket": {"type": "string"}, "path": {"type": "string", "default": ""} }, "required": ["credentials", "bucket"] } } ] } } @app.post("/mcp/tools/call") async def call_tool(request: Request): """Execute an MCP tool""" body = await request.json() tool_name = body.get("params", {}).get("name") arguments = body.get("params", {}).get("arguments", {}) try: # Route to appropriate handler handlers = { "supabase_query": handle_query, "supabase_insert": handle_insert, "supabase_update": handle_update, "supabase_delete": handle_delete, "supabase_upsert": handle_upsert, "supabase_sql": handle_sql, "supabase_rpc": handle_rpc, "supabase_storage_upload": handle_storage_upload, "supabase_storage_list": handle_storage_list } handler = handlers.get(tool_name) if not handler: return { "jsonrpc": "2.0", "id": body.get("id", 1), "error": {"code": 404, "message": f"Tool not found: {tool_name}"} } result = await handler(arguments) if "error" in result: return { "jsonrpc": "2.0", "id": body.get("id", 1), "error": {"code": result.get("code", 500), "message": result["error"]} } return { "jsonrpc": "2.0", "id": body.get("id", 1), "result": result } except Exception as e: logger.error(f"Tool execution failed: {str(e)}") return { "jsonrpc": "2.0", "id": body.get("id", 1), "error": {"code": 500, "message": str(e)} } # ============================================================================ # TOOL HANDLERS # ============================================================================ async def handle_query(args: Dict[str, Any]) -> Dict[str, Any]: """Handle supabase_query tool""" creds = SupabaseCredentials(**args["credentials"]) table = args["table"] select = args.get("select", "*") filters = args.get("filters") limit = args.get("limit") offset = args.get("offset") order = args.get("order") # Build URL url = f"{creds.url}/rest/v1/{table}?select={select}" if filters: url += build_filter_string(filters) if limit: url += f"&limit={limit}" if offset: url += f"&offset={offset}" if order: url += f"&order={order}" api_key = get_api_key(creds) result = await supabase_request("GET", url, api_key) if "error" in result: return {"error": f"Supabase error: {json.dumps(result['error'])}", "code": result.get("status_code", 500)} return {"data": result["data"], "count": len(result["data"]) if isinstance(result["data"], list) else 1} async def handle_insert(args: Dict[str, Any]) -> Dict[str, Any]: """Handle supabase_insert tool""" creds = SupabaseCredentials(**args["credentials"]) table = args["table"] data = args["data"] url = f"{creds.url}/rest/v1/{table}" api_key = get_api_key(creds) result = await supabase_request("POST", url, api_key, data=data) if "error" in result: return {"error": f"Supabase error: {json.dumps(result['error'])}", "code": result.get("status_code", 400)} return {"data": result["data"], "inserted": len(result["data"]) if isinstance(result["data"], list) else 1} async def handle_update(args: Dict[str, Any]) -> Dict[str, Any]: """Handle supabase_update tool""" creds = SupabaseCredentials(**args["credentials"]) table = args["table"] data = args["data"] filters = args["filters"] url = f"{creds.url}/rest/v1/{table}" url += build_filter_string(filters) api_key = get_api_key(creds) result = await supabase_request("PATCH", url, api_key, data=data) if "error" in result: return {"error": f"Supabase error: {json.dumps(result['error'])}", "code": result.get("status_code", 400)} return {"data": result["data"], "updated": len(result["data"]) if isinstance(result["data"], list) else 1} async def handle_delete(args: Dict[str, Any]) -> Dict[str, Any]: """Handle supabase_delete tool""" creds = SupabaseCredentials(**args["credentials"]) table = args["table"] filters = args["filters"] url = f"{creds.url}/rest/v1/{table}" url += build_filter_string(filters) api_key = get_api_key(creds) result = await supabase_request("DELETE", url, api_key) if "error" in result: return {"error": f"Supabase error: {json.dumps(result['error'])}", "code": result.get("status_code", 400)} return {"data": result.get("data", []), "deleted": len(result.get("data", [])) if result.get("data") else 0} async def handle_upsert(args: Dict[str, Any]) -> Dict[str, Any]: """Handle supabase_upsert tool""" creds = SupabaseCredentials(**args["credentials"]) table = args["table"] data = args["data"] on_conflict = args.get("on_conflict") url = f"{creds.url}/rest/v1/{table}" headers_override = {"Prefer": "resolution=merge-duplicates"} if on_conflict: headers_override["Prefer"] += f",on-conflict={on_conflict}" api_key = get_api_key(creds) result = await supabase_request("POST", url, api_key, data=data, headers_override=headers_override) if "error" in result: return {"error": f"Supabase error: {json.dumps(result['error'])}", "code": result.get("status_code", 400)} return {"data": result["data"], "upserted": len(result["data"]) if isinstance(result["data"], list) else 1} async def handle_sql(args: Dict[str, Any]) -> Dict[str, Any]: """Handle supabase_sql tool - execute raw SQL""" creds = SupabaseCredentials(**args["credentials"]) query = args["query"] params = args.get("params", {}) if not creds.service_key: return {"error": "SQL queries require service_key", "code": 403} # Use PostgREST's rpc endpoint or direct SQL execution # Note: Direct SQL might not be available via REST API, depends on setup url = f"{creds.url}/rest/v1/rpc/exec_sql" data = {"query": query} if params: data["params"] = params result = await supabase_request("POST", url, creds.service_key, data=data) if "error" in result: return {"error": f"SQL execution failed: {json.dumps(result['error'])}", "code": result.get("status_code", 500)} return {"data": result["data"]} async def handle_rpc(args: Dict[str, Any]) -> Dict[str, Any]: """Handle supabase_rpc tool - call stored procedures""" creds = SupabaseCredentials(**args["credentials"]) function_name = args["function_name"] params = args.get("params", {}) url = f"{creds.url}/rest/v1/rpc/{function_name}" api_key = get_api_key(creds) result = await supabase_request("POST", url, api_key, data=params) if "error" in result: return {"error": f"RPC call failed: {json.dumps(result['error'])}", "code": result.get("status_code", 400)} return {"data": result["data"]} async def handle_storage_upload(args: Dict[str, Any]) -> Dict[str, Any]: """Handle supabase_storage_upload tool""" creds = SupabaseCredentials(**args["credentials"]) bucket = args["bucket"] path = args["path"] file_data = args["file_data"] content_type = args.get("content_type", "application/octet-stream") import base64 file_bytes = base64.b64decode(file_data) url = f"{creds.url}/storage/v1/object/{bucket}/{path}" api_key = get_api_key(creds) headers = { "apikey": api_key, "Authorization": f"Bearer {api_key}", "Content-Type": content_type } async with httpx.AsyncClient() as client: response = await client.post(url, headers=headers, content=file_bytes) if response.status_code >= 400: return {"error": response.text, "code": response.status_code} return {"data": response.json(), "status": "uploaded"} async def handle_storage_list(args: Dict[str, Any]) -> Dict[str, Any]: """Handle supabase_storage_list tool""" creds = SupabaseCredentials(**args["credentials"]) bucket = args["bucket"] path = args.get("path", "") url = f"{creds.url}/storage/v1/object/list/{bucket}" if path: url += f"?prefix={path}" api_key = get_api_key(creds) result = await supabase_request("GET", url, api_key) if "error" in result: return {"error": f"Storage list failed: {json.dumps(result['error'])}", "code": result.get("status_code", 400)} return {"data": result["data"], "count": len(result["data"])} # ============================================================================ # LEGACY REST API ENDPOINTS (backward compatibility) # ============================================================================ @app.post("/api/query") async def api_query(request: QueryRequest): args = request.dict() result = await handle_query(args) if "error" in result: return JSONResponse(status_code=result.get("code", 500), content=result) return result @app.post("/api/insert") async def api_insert(request: InsertRequest): args = request.dict() result = await handle_insert(args) if "error" in result: return JSONResponse(status_code=result.get("code", 500), content=result) return result @app.post("/api/update") async def api_update(request: UpdateRequest): args = request.dict() result = await handle_update(args) if "error" in result: return JSONResponse(status_code=result.get("code", 500), content=result) return result @app.post("/api/delete") async def api_delete(request: DeleteRequest): args = request.dict() result = await handle_delete(args) if "error" in result: return JSONResponse(status_code=result.get("code", 500), content=result) return result @app.post("/api/rpc") async def api_rpc(request: RPCRequest): args = request.dict() result = await handle_rpc(args) if "error" in result: return JSONResponse(status_code=result.get("code", 500), content=result) return result # ============================================================================ # INFO ENDPOINTS # ============================================================================ @app.get("/") async def root(): return { "name": "Supabase MCP Server", "version": "2.0.0", "description": "Full-featured MCP server for Supabase", "features": [ "CRUD operations (Query, Insert, Update, Delete, Upsert)", "Raw SQL queries", "RPC function calls", "Storage management", "Compatible with Claude Desktop and n8n", "Works with any Supabase project" ], "endpoints": { "mcp": { "list_tools": "POST /mcp/tools/list", "call_tool": "POST /mcp/tools/call" }, "rest_api": { "query": "POST /api/query", "insert": "POST /api/insert", "update": "POST /api/update", "delete": "POST /api/delete", "rpc": "POST /api/rpc" } } } @app.get("/health") async def health(): return {"status": "healthy", "version": "2.0.0"} if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/romankonoplinai-arch/mcp_supabase'

If you have feedback or need assistance with the MCP directory API, please join our Discord server