import os
from dotenv import load_dotenv
import httpx
from fastmcp import FastMCP
load_dotenv()
CRUD_URL = os.getenv(
"CRUD_URL", "http://host.docker.internal:8000").rstrip("/")
SERVICE_API_KEY = os.getenv("SERVICE_API_KEY", "dev-key")
MCP_HOST = os.getenv("MCP_HOST", "0.0.0.0")
MCP_PORT = int(os.getenv("MCP_PORT", "8002"))
MCP_PATH = os.getenv("MCP_PATH", "/mcp")
mcp = FastMCP(name="crud-mcp-tools")
def _headers() -> dict:
return {"X-API-Key": SERVICE_API_KEY}
async def _request(method: str, path: str, json: dict | None = None, params: dict | None = None) -> dict:
url = f"{CRUD_URL}{path}"
async with httpx.AsyncClient(timeout=20) as client:
try:
r = await client.request(method, url, headers=_headers(), json=json, params=params)
if r.status_code >= 400:
return {"ok": False, "status_code": r.status_code, "error": r.text, "url": url}
return {"ok": True, "data": r.json()}
except httpx.RequestError as e:
return {"ok": False, "status_code": None, "error": str(e), "url": url}
# -------------------------
# Users tools
# -------------------------
@mcp.tool()
async def list_users(skip: int = 0, limit: int = 50) -> dict:
"""List users."""
return await _request("GET", "/users", params={"skip": skip, "limit": limit})
@mcp.tool()
async def get_user(user_id: str) -> dict:
"""Get a user by ID."""
return await _request("GET", f"/users/{user_id}")
@mcp.tool()
async def create_user(name: str, email: str, role: str = "user") -> dict:
"""Create a new user."""
return await _request("POST", "/users", json={"name": name, "email": email, "role": role})
@mcp.tool()
async def update_user(user_id: str, name: str | None = None, email: str | None = None, role: str | None = None) -> dict:
"""Update an existing user."""
payload = {k: v for k, v in {"name": name, "email": email,
"role": role}.items() if v is not None}
return await _request("PUT", f"/users/{user_id}", json=payload)
@mcp.tool()
async def delete_user(user_id: str, confirm: bool = False) -> dict:
"""Delete a user (requires confirm=true)."""
if not confirm:
return {"ok": False, "status_code": 400, "error": "Confirmation required. Call again with confirm=true."}
return await _request("DELETE", f"/users/{user_id}")
# -------------------------
# Products tools
# -------------------------
@mcp.tool()
async def list_products(skip: int = 0, limit: int = 50) -> dict:
"""List products."""
return await _request("GET", "/products", params={"skip": skip, "limit": limit})
@mcp.tool()
async def get_product(product_id: str) -> dict:
"""Get a product by ID."""
return await _request("GET", f"/products/{product_id}")
@mcp.tool()
async def create_product(name: str, price: float, description: str | None = None, owner_user_id: str | None = None) -> dict:
"""Create a new product."""
payload = {"name": name, "price": price,
"description": description, "owner_user_id": owner_user_id}
return await _request("POST", "/products", json=payload)
@mcp.tool()
async def update_product(product_id: str, name: str | None = None, price: float | None = None,
description: str | None = None, owner_user_id: str | None = None) -> dict:
"""Update an existing product."""
payload = {k: v for k, v in {"name": name, "price": price, "description": description,
"owner_user_id": owner_user_id}.items() if v is not None}
return await _request("PUT", f"/products/{product_id}", json=payload)
@mcp.tool()
async def delete_product(product_id: str, confirm: bool = False) -> dict:
"""Delete a product (requires confirm=true)."""
if not confirm:
return {"ok": False, "status_code": 400, "error": "Confirmation required. Call again with confirm=true."}
return await _request("DELETE", f"/products/{product_id}")
if __name__ == "__main__":
mcp.run(transport="http", host=MCP_HOST, port=MCP_PORT, path=MCP_PATH)