"""Additional API routes for MCP Gateway."""
import io
import json
from contextlib import redirect_stdout
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException
from secure_mcp_gateway.api_models import (
PICKED_CONFIG_PATH,
ApiKeyRotateRequest,
ProjectAddUserRequest,
ProjectAssignConfigRequest,
ProjectCreateRequest,
ProjectExportRequest,
ProjectSearchRequest,
SuccessResponse,
SystemBackupRequest,
SystemResetRequest,
SystemRestoreRequest,
UserCreateRequest,
UserDeleteRequest,
UserGenerateApiKeyRequest,
UserSearchRequest,
UserUpdateRequest,
get_api_key,
)
from secure_mcp_gateway.cli import (
add_user_to_project,
assign_config_to_project,
create_project,
create_user,
delete_all_user_api_keys,
delete_user,
delete_user_api_key,
disable_user_api_key,
enable_user_api_key,
export_project,
generate_user_api_key,
get_project,
get_project_config,
get_user,
list_all_api_keys,
list_project_users,
# Project functions
list_projects,
list_user_api_keys,
list_user_projects,
# User functions
list_users,
remove_all_users_from_project,
remove_project,
remove_user_from_project,
rotate_user_api_key,
search_projects,
search_users,
system_backup,
# System functions
system_health_check,
system_reset,
system_restore,
unassign_config_from_project,
update_user,
)
# Create router for additional routes
router = APIRouter()
# =============================================================================
# PROJECT ENDPOINTS
# =============================================================================
@router.get("/api/v1/projects", response_model=SuccessResponse)
async def get_projects(api_key: str = Depends(get_api_key)):
"""List all projects."""
try:
f = io.StringIO()
with redirect_stdout(f):
list_projects(PICKED_CONFIG_PATH)
projects = json.loads(f.getvalue())
return SuccessResponse(message="Projects retrieved successfully", data=projects)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/api/v1/projects", response_model=SuccessResponse)
async def create_project_endpoint(
request: ProjectCreateRequest, api_key: str = Depends(get_api_key)
):
"""Create a new project."""
try:
f = io.StringIO()
with redirect_stdout(f):
create_project(PICKED_CONFIG_PATH, request.project_name)
result = f.getvalue().strip()
project_id = result.split(": ")[1] if ": " in result else result
return SuccessResponse(
message="Project created successfully",
data={"project_id": project_id, "project_name": request.project_name},
)
except SystemExit:
raise HTTPException(status_code=400, detail="Project creation failed")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.get("/api/v1/projects/{project_identifier}", response_model=SuccessResponse)
async def get_project_endpoint(
project_identifier: str, api_key: str = Depends(get_api_key)
):
"""Get a specific project."""
try:
f = io.StringIO()
with redirect_stdout(f):
get_project(PICKED_CONFIG_PATH, project_identifier)
project_data = json.loads(f.getvalue())
return SuccessResponse(
message="Project retrieved successfully", data=project_data
)
except SystemExit:
raise HTTPException(status_code=404, detail="Project not found")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.delete("/api/v1/projects/{project_identifier}", response_model=SuccessResponse)
async def delete_project_endpoint(
project_identifier: str, api_key: str = Depends(get_api_key)
):
"""Delete a project."""
try:
f = io.StringIO()
with redirect_stdout(f):
remove_project(PICKED_CONFIG_PATH, project_identifier)
return SuccessResponse(message="Project deleted successfully")
except SystemExit:
raise HTTPException(status_code=400, detail="Project deletion failed")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post(
"/api/v1/projects/{project_identifier}/assign-config",
response_model=SuccessResponse,
)
async def assign_config_to_project_endpoint(
project_identifier: str,
request: ProjectAssignConfigRequest,
api_key: str = Depends(get_api_key),
):
"""Assign a configuration to a project."""
try:
config_identifier = request.config_name or request.config_id
if not config_identifier:
raise HTTPException(
status_code=400, detail="Either config_name or config_id is required"
)
f = io.StringIO()
with redirect_stdout(f):
assign_config_to_project(
PICKED_CONFIG_PATH, project_identifier, config_identifier
)
return SuccessResponse(message="Configuration assigned to project successfully")
except SystemExit:
raise HTTPException(status_code=400, detail="Configuration assignment failed")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post(
"/api/v1/projects/{project_identifier}/unassign-config",
response_model=SuccessResponse,
)
async def unassign_config_from_project_endpoint(
project_identifier: str, api_key: str = Depends(get_api_key)
):
"""Unassign configuration from a project."""
try:
f = io.StringIO()
with redirect_stdout(f):
unassign_config_from_project(PICKED_CONFIG_PATH, project_identifier)
return SuccessResponse(
message="Configuration unassigned from project successfully"
)
except SystemExit:
raise HTTPException(status_code=400, detail="Configuration unassignment failed")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.get(
"/api/v1/projects/{project_identifier}/config", response_model=SuccessResponse
)
async def get_project_config_endpoint(
project_identifier: str, api_key: str = Depends(get_api_key)
):
"""Get configuration assigned to a project."""
try:
f = io.StringIO()
with redirect_stdout(f):
get_project_config(PICKED_CONFIG_PATH, project_identifier)
config_data = json.loads(f.getvalue())
return SuccessResponse(
message="Project configuration retrieved successfully", data=config_data
)
except SystemExit:
raise HTTPException(
status_code=404, detail="Project or configuration not found"
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.get(
"/api/v1/projects/{project_identifier}/users", response_model=SuccessResponse
)
async def get_project_users(
project_identifier: str, api_key: str = Depends(get_api_key)
):
"""List users in a project."""
try:
f = io.StringIO()
with redirect_stdout(f):
list_project_users(PICKED_CONFIG_PATH, project_identifier)
users = json.loads(f.getvalue())
return SuccessResponse(
message="Project users retrieved successfully", data=users
)
except SystemExit:
raise HTTPException(status_code=404, detail="Project not found")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post(
"/api/v1/projects/{project_identifier}/users", response_model=SuccessResponse
)
async def add_user_to_project_endpoint(
project_identifier: str,
request: ProjectAddUserRequest,
api_key: str = Depends(get_api_key),
):
"""Add a user to a project."""
try:
user_identifier = request.user_id or request.email
if not user_identifier:
raise HTTPException(
status_code=400, detail="Either user_id or email is required"
)
f = io.StringIO()
with redirect_stdout(f):
add_user_to_project(PICKED_CONFIG_PATH, project_identifier, user_identifier)
return SuccessResponse(message="User added to project successfully")
except SystemExit:
raise HTTPException(status_code=400, detail="User addition failed")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.delete(
"/api/v1/projects/{project_identifier}/users/{user_identifier}",
response_model=SuccessResponse,
)
async def remove_user_from_project_endpoint(
project_identifier: str, user_identifier: str, api_key: str = Depends(get_api_key)
):
"""Remove a user from a project."""
try:
f = io.StringIO()
with redirect_stdout(f):
remove_user_from_project(
PICKED_CONFIG_PATH, project_identifier, user_identifier
)
return SuccessResponse(message="User removed from project successfully")
except SystemExit:
raise HTTPException(status_code=400, detail="User removal failed")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.delete(
"/api/v1/projects/{project_identifier}/users", response_model=SuccessResponse
)
async def remove_all_users_from_project_endpoint(
project_identifier: str, api_key: str = Depends(get_api_key)
):
"""Remove all users from a project."""
try:
f = io.StringIO()
with redirect_stdout(f):
remove_all_users_from_project(PICKED_CONFIG_PATH, project_identifier)
return SuccessResponse(message="All users removed from project successfully")
except SystemExit:
raise HTTPException(status_code=400, detail="User removal failed")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post(
"/api/v1/projects/{project_identifier}/export", response_model=SuccessResponse
)
async def export_project_endpoint(
project_identifier: str,
request: ProjectExportRequest,
api_key: str = Depends(get_api_key),
):
"""Export a project to file."""
try:
f = io.StringIO()
with redirect_stdout(f):
export_project(PICKED_CONFIG_PATH, project_identifier, request.output_file)
return SuccessResponse(message="Project exported successfully")
except SystemExit:
raise HTTPException(status_code=400, detail="Project export failed")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/api/v1/projects/search", response_model=SuccessResponse)
async def search_projects_endpoint(
request: ProjectSearchRequest, api_key: str = Depends(get_api_key)
):
"""Search projects."""
try:
f = io.StringIO()
with redirect_stdout(f):
search_projects(PICKED_CONFIG_PATH, request.search_term)
results = json.loads(f.getvalue())
return SuccessResponse(message="Search completed successfully", data=results)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# =============================================================================
# USER ENDPOINTS
# =============================================================================
@router.get("/api/v1/users", response_model=SuccessResponse)
async def get_users(api_key: str = Depends(get_api_key)):
"""List all users."""
try:
f = io.StringIO()
with redirect_stdout(f):
list_users(PICKED_CONFIG_PATH)
users = json.loads(f.getvalue())
return SuccessResponse(message="Users retrieved successfully", data=users)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/api/v1/users", response_model=SuccessResponse)
async def create_user_endpoint(
request: UserCreateRequest, api_key: str = Depends(get_api_key)
):
"""Create a new user."""
try:
f = io.StringIO()
with redirect_stdout(f):
create_user(PICKED_CONFIG_PATH, request.email)
result = f.getvalue().strip()
user_id = result.split(": ")[1] if ": " in result else result
return SuccessResponse(
message="User created successfully",
data={"user_id": user_id, "email": request.email},
)
except SystemExit:
raise HTTPException(status_code=400, detail="User creation failed")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.get("/api/v1/users/{user_identifier}", response_model=SuccessResponse)
async def get_user_endpoint(user_identifier: str, api_key: str = Depends(get_api_key)):
"""Get a specific user."""
try:
f = io.StringIO()
with redirect_stdout(f):
get_user(PICKED_CONFIG_PATH, user_identifier)
user_data = json.loads(f.getvalue())
return SuccessResponse(message="User retrieved successfully", data=user_data)
except SystemExit:
raise HTTPException(status_code=404, detail="User not found")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.put("/api/v1/users/{user_identifier}", response_model=SuccessResponse)
async def update_user_endpoint(
user_identifier: str,
request: UserUpdateRequest,
api_key: str = Depends(get_api_key),
):
"""Update a user."""
try:
f = io.StringIO()
with redirect_stdout(f):
update_user(PICKED_CONFIG_PATH, user_identifier, request.new_email)
return SuccessResponse(message="User updated successfully")
except SystemExit:
raise HTTPException(status_code=400, detail="User update failed")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.delete("/api/v1/users/{user_identifier}", response_model=SuccessResponse)
async def delete_user_endpoint(
user_identifier: str,
request: UserDeleteRequest,
api_key: str = Depends(get_api_key),
):
"""Delete a user."""
try:
f = io.StringIO()
with redirect_stdout(f):
delete_user(PICKED_CONFIG_PATH, user_identifier, request.force)
return SuccessResponse(message="User deleted successfully")
except SystemExit:
raise HTTPException(status_code=400, detail="User deletion failed")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.get("/api/v1/users/{user_identifier}/projects", response_model=SuccessResponse)
async def get_user_projects(user_identifier: str, api_key: str = Depends(get_api_key)):
"""List projects for a specific user."""
try:
f = io.StringIO()
with redirect_stdout(f):
list_user_projects(PICKED_CONFIG_PATH, user_identifier)
projects = json.loads(f.getvalue())
return SuccessResponse(
message="User projects retrieved successfully", data=projects
)
except SystemExit:
raise HTTPException(status_code=404, detail="User not found")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/api/v1/users/{user_identifier}/api-keys", response_model=SuccessResponse)
async def generate_user_api_key_endpoint(
user_identifier: str,
request: UserGenerateApiKeyRequest,
api_key: str = Depends(get_api_key),
):
"""Generate an API key for a user."""
try:
project_identifier = request.project_name or request.project_id
if not project_identifier:
raise HTTPException(
status_code=400, detail="Either project_name or project_id is required"
)
f = io.StringIO()
with redirect_stdout(f):
generate_user_api_key(
PICKED_CONFIG_PATH, user_identifier, project_identifier
)
result = f.getvalue().strip()
new_api_key = result.split(": ")[1] if ": " in result else result
return SuccessResponse(
message="API key generated successfully", data={"api_key": new_api_key}
)
except SystemExit:
raise HTTPException(status_code=400, detail="API key generation failed")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.get("/api/v1/users/{user_identifier}/api-keys", response_model=SuccessResponse)
async def get_user_api_keys(
user_identifier: str,
project_identifier: Optional[str] = None,
api_key: str = Depends(get_api_key),
):
"""List API keys for a user."""
try:
f = io.StringIO()
with redirect_stdout(f):
list_user_api_keys(PICKED_CONFIG_PATH, user_identifier, project_identifier)
api_keys = json.loads(f.getvalue())
return SuccessResponse(
message="User API keys retrieved successfully", data=api_keys
)
except SystemExit:
raise HTTPException(status_code=404, detail="User not found")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.delete(
"/api/v1/users/{user_identifier}/api-keys", response_model=SuccessResponse
)
async def delete_all_user_api_keys_endpoint(
user_identifier: str, api_key: str = Depends(get_api_key)
):
"""Delete all API keys for a user."""
try:
f = io.StringIO()
with redirect_stdout(f):
delete_all_user_api_keys(PICKED_CONFIG_PATH, user_identifier)
return SuccessResponse(message="All user API keys deleted successfully")
except SystemExit:
raise HTTPException(status_code=400, detail="API key deletion failed")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.get("/api/v1/api-keys", response_model=SuccessResponse)
async def get_all_api_keys(api_key: str = Depends(get_api_key)):
"""List all API keys across all users."""
try:
f = io.StringIO()
with redirect_stdout(f):
list_all_api_keys(PICKED_CONFIG_PATH)
all_keys = json.loads(f.getvalue())
return SuccessResponse(
message="All API keys retrieved successfully", data=all_keys
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/api/v1/api-keys/rotate", response_model=SuccessResponse)
async def rotate_api_key_endpoint(
request: ApiKeyRotateRequest, api_key: str = Depends(get_api_key)
):
"""Rotate an API key."""
try:
f = io.StringIO()
with redirect_stdout(f):
rotate_user_api_key(PICKED_CONFIG_PATH, request.api_key)
result = f.getvalue().strip()
new_api_key = result.split(": ")[1] if ": " in result else result
return SuccessResponse(
message="API key rotated successfully", data={"new_api_key": new_api_key}
)
except SystemExit:
raise HTTPException(status_code=400, detail="API key rotation failed")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/api/v1/api-keys/{api_key}/disable", response_model=SuccessResponse)
async def disable_api_key_endpoint(
api_key: str, api_key_dep: str = Depends(get_api_key)
):
"""Disable an API key."""
try:
f = io.StringIO()
with redirect_stdout(f):
disable_user_api_key(PICKED_CONFIG_PATH, api_key)
return SuccessResponse(message="API key disabled successfully")
except SystemExit:
raise HTTPException(status_code=400, detail="API key disable failed")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/api/v1/api-keys/{api_key}/enable", response_model=SuccessResponse)
async def enable_api_key_endpoint(
api_key: str, api_key_dep: str = Depends(get_api_key)
):
"""Enable an API key."""
try:
f = io.StringIO()
with redirect_stdout(f):
enable_user_api_key(PICKED_CONFIG_PATH, api_key)
return SuccessResponse(message="API key enabled successfully")
except SystemExit:
raise HTTPException(status_code=400, detail="API key enable failed")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.delete("/api/v1/api-keys/{api_key}", response_model=SuccessResponse)
async def delete_api_key_endpoint(
api_key: str, api_key_dep: str = Depends(get_api_key)
):
"""Delete an API key."""
try:
f = io.StringIO()
with redirect_stdout(f):
delete_user_api_key(PICKED_CONFIG_PATH, api_key)
return SuccessResponse(message="API key deleted successfully")
except SystemExit:
raise HTTPException(status_code=400, detail="API key deletion failed")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/api/v1/users/search", response_model=SuccessResponse)
async def search_users_endpoint(
request: UserSearchRequest, api_key: str = Depends(get_api_key)
):
"""Search users."""
try:
f = io.StringIO()
with redirect_stdout(f):
search_users(PICKED_CONFIG_PATH, request.search_term)
results = json.loads(f.getvalue())
return SuccessResponse(message="Search completed successfully", data=results)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# =============================================================================
# SYSTEM ENDPOINTS
# =============================================================================
@router.get("/api/v1/system/health", response_model=SuccessResponse)
async def system_health_check_endpoint(api_key: str = Depends(get_api_key)):
"""Check system health."""
try:
f = io.StringIO()
with redirect_stdout(f):
system_health_check(PICKED_CONFIG_PATH)
health_data = json.loads(f.getvalue())
return SuccessResponse(
message="System health check completed", data=health_data
)
except SystemExit:
raise HTTPException(status_code=500, detail="System health check failed")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/api/v1/system/backup", response_model=SuccessResponse)
async def system_backup_endpoint(
request: SystemBackupRequest, api_key: str = Depends(get_api_key)
):
"""Backup system configuration."""
try:
f = io.StringIO()
with redirect_stdout(f):
system_backup(PICKED_CONFIG_PATH, request.output_file)
return SuccessResponse(message="System backup completed successfully")
except SystemExit:
raise HTTPException(status_code=400, detail="System backup failed")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/api/v1/system/restore", response_model=SuccessResponse)
async def system_restore_endpoint(
request: SystemRestoreRequest, api_key: str = Depends(get_api_key)
):
"""Restore system configuration."""
try:
f = io.StringIO()
with redirect_stdout(f):
system_restore(PICKED_CONFIG_PATH, request.input_file)
return SuccessResponse(message="System restore completed successfully")
except SystemExit:
raise HTTPException(status_code=400, detail="System restore failed")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/api/v1/system/reset", response_model=SuccessResponse)
async def system_reset_endpoint(
request: SystemResetRequest, api_key: str = Depends(get_api_key)
):
"""Reset system configuration."""
try:
f = io.StringIO()
with redirect_stdout(f):
system_reset(PICKED_CONFIG_PATH, request.confirm)
return SuccessResponse(message="System reset completed successfully")
except SystemExit:
raise HTTPException(status_code=400, detail="System reset failed")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))