Skip to main content
Glama

MockLoop MCP Server

Official
by MockLoop
auth_middleware_template.j2โ€ข7.75 kB
{# Jinja2 template for authentication middleware #} import os import time import jwt from typing import Dict, List, Optional, Any, Union from fastapi import Depends, HTTPException, status, Request from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from fastapi.security.api_key import APIKeyHeader from pydantic import BaseModel # Configuration # In a real app, these would be in environment variables or a secure config API_KEY_NAME = "X-API-Key" API_KEY = "mock-api-key-{{ random_suffix }}" # For testing purposes JWT_SECRET_KEY = "mock-jwt-secret-{{ random_suffix }}" # For testing only JWT_ALGORITHM = "HS256" JWT_EXPIRATION_SECONDS = 3600 # 1 hour # Security schemes api_key_header = APIKeyHeader(name=API_KEY_NAME, auto_error=False) token_auth_scheme = HTTPBearer(auto_error=False) # Mock user database # In a real application, this would be in a database mock_users_db = { "admin": { "username": "admin", "full_name": "Admin User", "email": "admin@example.com", "disabled": False, "roles": ["admin"] }, "user": { "username": "user", "full_name": "Regular User", "email": "user@example.com", "disabled": False, "roles": ["user"] }, "guest": { "username": "guest", "full_name": "Guest User", "email": "guest@example.com", "disabled": False, "roles": ["guest"] } } # Models class Token(BaseModel): access_token: str token_type: str expires_at: int user_info: Dict[str, Any] class TokenData(BaseModel): username: Optional[str] = None roles: List[str] = [] class User(BaseModel): username: str email: Optional[str] = None full_name: Optional[str] = None disabled: Optional[bool] = None roles: List[str] = [] def verify_api_key(api_key: str = Depends(api_key_header)) -> bool: """ Verify API key for authentication. Args: api_key: API key from X-API-Key header Returns: True if API key is valid Raises: HTTPException: If API key is invalid """ if api_key == API_KEY: return True raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid API key", headers={"WWW-Authenticate": "ApiKey"}, ) def get_user(username: str) -> User: """ Get user from mock database. Args: username: Username to lookup Returns: User object if found Raises: HTTPException: If user not found or disabled """ if username in mock_users_db: user_dict = mock_users_db[username] user = User(**user_dict) return user # For mock server, if username not in db but has format test_user_*, # create a test user on the fly if username.startswith("test_user_"): return User( username=username, email=f"{username}@example.com", full_name=f"Test User {username.split('_')[-1]}", disabled=False, roles=["test_user"] ) raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"User {username} not found", ) def decode_token(token: str) -> TokenData: """ Decode JWT token and extract user data. Args: token: JWT token string Returns: TokenData containing username and roles Raises: HTTPException: If token is invalid or expired """ try: payload = jwt.decode(token, JWT_SECRET_KEY, algorithms=[JWT_ALGORITHM]) username: str = payload.get("sub") if username is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token credentials", headers={"WWW-Authenticate": "Bearer"}, ) # Get roles from token payload roles = payload.get("roles", []) return TokenData(username=username, roles=roles) except jwt.PyJWTError: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token credentials", headers={"WWW-Authenticate": "Bearer"}, ) async def verify_jwt_token(token: HTTPAuthorizationCredentials = Depends(token_auth_scheme)) -> User: """ Verify JWT token and return associated user. Args: token: Bearer token from Authorization header Returns: User associated with the token Raises: HTTPException: If token is invalid or user is disabled """ if token is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Bearer authentication required", headers={"WWW-Authenticate": "Bearer"}, ) token_data = decode_token(token.credentials) user = get_user(token_data.username) if user.disabled: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Inactive user", ) return user def create_access_token(data: Dict[str, Any], expires_delta: Optional[int] = None) -> str: """ Create JWT access token. Args: data: Data to encode in token expires_delta: Optional expiration time in seconds Returns: Encoded JWT token """ to_encode = data.copy() expire = time.time() + (expires_delta or JWT_EXPIRATION_SECONDS) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, JWT_SECRET_KEY, algorithm=JWT_ALGORITHM) return encoded_jwt def generate_token_response(username: str, password: str) -> Token: """ Generate token response for OAuth2 compatible token endpoint. Args: username: Username for authentication password: Password for authentication (ignored in mock) Returns: Token response with access token Raises: HTTPException: If username is invalid """ # For mock API, any valid username works with any password # In a real app, you would verify the password here user = get_user(username) # Create token payload token_data = { "sub": user.username, "roles": user.roles } # Create token expires = JWT_EXPIRATION_SECONDS access_token = create_access_token(token_data, expires_delta=expires) # Create token response token_response = Token( access_token=access_token, token_type="bearer", expires_at=int(time.time() + expires), user_info={ "username": user.username, "full_name": user.full_name, "email": user.email, "roles": user.roles } ) return token_response def has_role(required_roles: List[str]): """ Dependency to check if user has required role. Args: required_roles: List of roles, any of which grants access Returns: Dependency function checking user roles """ async def role_checker(user: User = Depends(verify_jwt_token)): # Admin role has access to everything if "admin" in user.roles: return user # Check if user has any of the required roles for role in required_roles: if role in user.roles: return user raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=f"Insufficient permissions. Required roles: {required_roles}" ) return role_checker

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/MockLoop/mockloop-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server