main.py•5.36 kB
# MCP + REST server providing a JSON-file-backed "collections" database.
import os
import json
import uuid
import threading
from typing import Any, Dict, List, Optional, Tuple
from fastapi import FastAPI, HTTPException, Body
from fastmcp import FastMCP
DB_PATH = os.path.join(os.path.dirname(__file__), "db.json")
_lock = threading.Lock()
# ---------- Storage helpers ----------
def load_db() -> Dict[str, List[Dict[str, Any]]]:
"""Load DB from disk; return {} if file missing/invalid."""
if not os.path.exists(DB_PATH):
return {}
with open(DB_PATH, "r", encoding="utf-8") as fh:
try:
return json.load(fh)
except json.JSONDecodeError:
return {}
def save_db(db: Dict[str, List[Dict[str, Any]]]) -> None:
"""Atomic write to disk."""
tmp = DB_PATH + ".tmp"
with open(tmp, "w", encoding="utf-8") as fh:
json.dump(db, fh, indent=2, ensure_ascii=False)
os.replace(tmp, DB_PATH)
def ensure_collection(db: Dict[str, List[Dict[str, Any]]], collection: str) -> None:
if collection not in db:
db[collection] = []
def find_item(
db: Dict[str, List[Dict[str, Any]]], collection: str, item_id: str
) -> Tuple[Optional[int], Optional[Dict[str, Any]]]:
for idx, it in enumerate(db.get(collection, [])):
if str(it.get("id")) == str(item_id):
return idx, it
return None, None
# ---------- Shared DB ops (used by REST + MCP) ----------
def _db_add_item(collection: str, payload: Dict[str, Any]) -> Dict[str, Any]:
with _lock:
db = load_db()
ensure_collection(db, collection)
item_id = str(uuid.uuid4())
item = {"id": item_id}
if isinstance(payload, dict):
item.update(payload)
db[collection].append(item)
save_db(db)
return item
def _db_get_item(collection: str, item_id: str) -> Dict[str, Any]:
with _lock:
db = load_db()
_, item = find_item(db, collection, item_id)
if item is None:
raise HTTPException(status_code=404, detail="Item not found")
return item
def _db_list_items(collection: str) -> List[Dict[str, Any]]:
with _lock:
db = load_db()
return db.get(collection, [])
def _db_delete_item(collection: str, item_id: str) -> None:
with _lock:
db = load_db()
idx, item = find_item(db, collection, item_id)
if item is None:
raise HTTPException(status_code=404, detail="Item not found")
db[collection].pop(idx)
save_db(db)
def _db_update_item(collection: str, item_id: str, payload: Dict[str, Any]) -> Dict[str, Any]:
with _lock:
db = load_db()
idx, item = find_item(db, collection, item_id)
if item is None:
raise HTTPException(status_code=404, detail="Item not found")
if isinstance(payload, dict):
item.update(payload)
db[collection][idx] = item
save_db(db)
return item
# ---------- FastAPI (REST) ----------
app = FastAPI(title="JSON DB API")
@app.post("/db/{collection}")
def add_item(collection: str, payload: Dict[str, Any] = Body(...)):
item = _db_add_item(collection, payload)
return {"ok": True, "item": item}
@app.get("/db/{collection}/{item_id}")
def get_item(collection: str, item_id: str):
return _db_get_item(collection, item_id)
@app.get("/db/{collection}")
def list_items(collection: str):
return _db_list_items(collection)
@app.delete("/db/{collection}/{item_id}")
def delete_item(collection: str, item_id: str):
_db_delete_item(collection, item_id)
return {"ok": True}
@app.put("/db/{collection}/{item_id}")
def update_item(collection: str, item_id: str, payload: Dict[str, Any] = Body(...)):
item = _db_update_item(collection, item_id, payload)
return {"ok": True, "item": item}
# ---------- FastMCP (MCP tools) ----------
mcp = FastMCP(name="JsonCollectionsDB")
@mcp.tool(name="db.add_item", title="Add DB item", description="Add item to JSON DB")
def mcp_add_item(collection: str, payload: Dict[str, Any]):
return {"ok": True, "item": _db_add_item(collection, payload)}
@mcp.tool(name="db.get_item", title="Get DB item", description="Get item by id")
def mcp_get_item(collection: str, item_id: str):
return _db_get_item(collection, item_id)
@mcp.tool(name="db.list_items", title="List DB items", description="List items in a collection")
def mcp_list_items(collection: str):
return _db_list_items(collection)
@mcp.tool(name="db.delete_item", title="Delete DB item", description="Delete item by id")
def mcp_delete_item(collection: str, item_id: str):
_db_delete_item(collection, item_id)
return {"ok": True}
@mcp.tool(name="db.update_item", title="Update DB item", description="Update an item")
def mcp_update_item(collection: str, item_id: str, payload: Dict[str, Any]):
return {"ok": True, "item": _db_update_item(collection, item_id, payload)}
# Expose MCP (SSE) at /mcp so Copilot or other clients can connect
mcp_sse_app = mcp.sse_app()
app.mount("/mcp", mcp_sse_app)
@app.get("/")
def health():
return {"ok": True, "rest": "/db/{collection}", "mcp_sse": "/mcp/"}
if __name__ == "__main__":
import uvicorn
# single process to avoid cross-process file write races
uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)