Skip to main content
Glama
_new_middleware.py13.7 kB
"""New middleware-based approach for FastMCP 2.x compatibility.""" class UserContextMiddleware(BaseHTTPMiddleware): """Middleware to capture X-User-ID and X-Chat-ID headers and set context variables.""" async def dispatch(self, request: Request, call_next): # Extract user_id and chat_id from request headers user_id = request.headers.get("x-user-id") or request.query_params.get("user_id") chat_id = request.headers.get("x-chat-id") or request.query_params.get("chat_id") # Set the context variables for this request user_token = user_id_var.set(user_id) chat_token = chat_id_var.set(chat_id) # Also store session info if we have session_id session_id = request.query_params.get("session_id") if session_id and (user_id or chat_id): set_session_info(session_id, user_id, chat_id) try: response = await call_next(request) return response finally: user_id_var.reset(user_token) chat_id_var.reset(chat_token) class FilesystemFastMCP(FastMCP): """FastMCP with middleware injection for session context and admin routes.""" def http_app(self, **kwargs): """Override http_app to add middleware and custom routes.""" # Get the base app from parent app = super().http_app(**kwargs) # Add our middleware for capturing user context app.add_middleware(UserContextMiddleware) # Add admin and web routes self._add_admin_routes(app) return app def _add_admin_routes(self, app): """Add admin API and web UI routes to the app.""" def get_user_data_dir() -> Path: """Get the user data directory path.""" default_user_data_dir = Path(__file__).parent.parent / "user_data" return Path(os.environ.get("MCP_WORKSPACES_DIR", str(default_user_data_dir))) def get_web_config() -> Dict[str, Any]: cfg = load_config() return cfg.get("admin_web", {}) def get_folder_stats(folder_path: Path) -> Dict[str, Any]: """Get statistics for a folder.""" try: stat = folder_path.stat() file_count = 0 dir_count = 0 total_size = 0 for item in folder_path.rglob("*"): if item.is_file(): file_count += 1 total_size += item.stat().st_size elif item.is_dir(): dir_count += 1 return { "path": str(folder_path), "name": folder_path.name, "created_time": datetime.fromtimestamp(stat.st_ctime).isoformat(), "modified_time": datetime.fromtimestamp(stat.st_mtime).isoformat(), "accessed_time": datetime.fromtimestamp(stat.st_atime).isoformat(), "file_count": file_count, "dir_count": dir_count, "total_size": total_size, } except Exception as e: return {"error": str(e)} # Admin auth helper async def verify_admin_token(request: Request) -> bool: config = load_config() expected_token = config.get("admin_token") if not expected_token: return False auth_header = request.headers.get("authorization", "") if auth_header.startswith("Bearer "): token = auth_header[7:] return token == expected_token return False # ========== Admin API Endpoints ========== async def admin_stats(request: Request): if not await verify_admin_token(request): return JSONResponse({"success": False, "error": "Unauthorized"}, status_code=401) user_data_dir = get_user_data_dir() if not user_data_dir.exists(): return JSONResponse({ "success": True, "user_data_dir": str(user_data_dir), "total_workspaces": 0, "unique_users": 0, "total_size_bytes": 0, "total_size_human": "0 B", }) workspaces = [d for d in user_data_dir.iterdir() if d.is_dir()] unique_users = set() total_size = 0 for ws in workspaces: name = ws.name if "_" in name: user_part = name.rsplit("_", 1)[0] unique_users.add(user_part) else: unique_users.add(name) for f in ws.rglob("*"): if f.is_file(): total_size += f.stat().st_size def human_size(size): for unit in ["B", "KB", "MB", "GB", "TB"]: if size < 1024: return f"{size:.2f} {unit}" size /= 1024 return f"{size:.2f} PB" return JSONResponse({ "success": True, "user_data_dir": str(user_data_dir), "total_workspaces": len(workspaces), "unique_users": len(unique_users), "total_size_bytes": total_size, "total_size_human": human_size(total_size), }) async def admin_list_workspaces(request: Request): if not await verify_admin_token(request): return JSONResponse({"success": False, "error": "Unauthorized"}, status_code=401) user_id_filter = request.query_params.get("user_id") user_data_dir = get_user_data_dir() if not user_data_dir.exists(): return JSONResponse({"success": True, "workspaces": []}) workspaces = [] for ws in user_data_dir.iterdir(): if not ws.is_dir(): continue if user_id_filter and not ws.name.startswith(user_id_filter): continue workspaces.append(get_folder_stats(ws)) return JSONResponse({"success": True, "workspaces": workspaces}) async def admin_workspace_info(request: Request): if not await verify_admin_token(request): return JSONResponse({"success": False, "error": "Unauthorized"}, status_code=401) workspace_id = request.path_params.get("workspace_id") user_data_dir = get_user_data_dir() ws_path = user_data_dir / workspace_id if not ws_path.exists() or not ws_path.is_dir(): return JSONResponse({"success": False, "error": "Workspace not found"}, status_code=404) return JSONResponse({"success": True, "workspace": get_folder_stats(ws_path)}) async def admin_workspace_tree(request: Request): if not await verify_admin_token(request): return JSONResponse({"success": False, "error": "Unauthorized"}, status_code=401) workspace_id = request.path_params.get("workspace_id") max_depth = int(request.query_params.get("max_depth", "5")) user_data_dir = get_user_data_dir() ws_path = user_data_dir / workspace_id if not ws_path.exists() or not ws_path.is_dir(): return JSONResponse({"success": False, "error": "Workspace not found"}, status_code=404) def build_tree(path: Path, depth: int = 0): if depth > max_depth: return {"name": path.name, "type": "directory", "truncated": True} if path.is_file(): return { "name": path.name, "type": "file", "size": path.stat().st_size, } children = [] try: for item in sorted(path.iterdir(), key=lambda x: (x.is_file(), x.name)): children.append(build_tree(item, depth + 1)) except PermissionError: pass return { "name": path.name, "type": "directory", "children": children, } return JSONResponse({"success": True, "tree": build_tree(ws_path)}) async def admin_delete_workspace(request: Request): if not await verify_admin_token(request): return JSONResponse({"success": False, "error": "Unauthorized"}, status_code=401) workspace_id = request.path_params.get("workspace_id") confirm = request.query_params.get("confirm") if confirm != "yes": return JSONResponse({ "success": False, "error": "Add ?confirm=yes to confirm deletion" }, status_code=400) user_data_dir = get_user_data_dir() ws_path = user_data_dir / workspace_id if not ws_path.exists() or not ws_path.is_dir(): return JSONResponse({"success": False, "error": "Workspace not found"}, status_code=404) import shutil shutil.rmtree(ws_path) return JSONResponse({"success": True, "message": f"Workspace {workspace_id} deleted"}) # ========== Web UI Endpoints ========== def web_disabled_response(): return JSONResponse({"error": "Web UI is disabled"}, status_code=403) async def web_tree_endpoint(request: Request): web_cfg = get_web_config() if not web_cfg.get("enabled", False): return web_disabled_response() pwd = request.query_params.get("password", "") if pwd != web_cfg.get("password", ""): return JSONResponse({"error": "Invalid password"}, status_code=401) user_data_dir = get_user_data_dir() def build_tree(path: Path, rel: str = ""): if path.is_file(): return {"name": path.name, "type": "file", "path": rel} children = [] try: for item in sorted(path.iterdir(), key=lambda x: (x.is_file(), x.name)): child_rel = f"{rel}/{item.name}" if rel else item.name children.append(build_tree(item, child_rel)) except PermissionError: pass return {"name": path.name or "user_data", "type": "directory", "children": children, "path": rel} return JSONResponse(build_tree(user_data_dir)) async def web_file_endpoint(request: Request): web_cfg = get_web_config() if not web_cfg.get("enabled", False): return web_disabled_response() pwd = request.query_params.get("password", "") if pwd != web_cfg.get("password", ""): return JSONResponse({"error": "Invalid password"}, status_code=401) file_path = request.query_params.get("path", "") user_data_dir = get_user_data_dir() target = (user_data_dir / file_path).resolve() if not str(target).startswith(str(user_data_dir.resolve())): return JSONResponse({"error": "Access denied"}, status_code=403) if not target.exists() or not target.is_file(): return JSONResponse({"error": "File not found"}, status_code=404) try: content = target.read_text(encoding="utf-8", errors="replace") return JSONResponse({"content": content, "name": target.name}) except Exception as e: return JSONResponse({"error": str(e)}, status_code=500) async def web_index(request: Request): web_cfg = get_web_config() if not web_cfg.get("enabled", False): return web_disabled_response() web_dir = Path(__file__).parent / "web" return FileResponse(web_dir / "index.html") async def web_static(request: Request): web_cfg = get_web_config() if not web_cfg.get("enabled", False): return web_disabled_response() rel = request.path_params.get("path") or "" web_dir = Path(__file__).parent / "web" target = (web_dir / rel).resolve() web_root = web_dir.resolve() if not str(target).startswith(str(web_root)) or not target.exists() or target.is_dir(): target = web_root / "index.html" return FileResponse(target) # Add routes to the app app.routes.extend([ # Admin Web UI Route("/admin/api/tree", endpoint=web_tree_endpoint, methods=["GET"]), Route("/admin/api/file", endpoint=web_file_endpoint, methods=["GET"]), Route("/admin", endpoint=web_index, methods=["GET"]), Route("/admin/{path:path}", endpoint=web_static, methods=["GET"]), # Admin API routes Route("/admin/stats", endpoint=admin_stats, methods=["GET"]), Route("/admin/workspaces", endpoint=admin_list_workspaces, methods=["GET"]), Route("/admin/workspace/{workspace_id}", endpoint=admin_workspace_info, methods=["GET"]), Route("/admin/workspace/{workspace_id}/tree", endpoint=admin_workspace_tree, methods=["GET"]), Route("/admin/workspace/{workspace_id}", endpoint=admin_delete_workspace, methods=["DELETE"]), ])

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/answerlink/MCP-Workspace-Server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server