#!/usr/bin/env python3
"""
AAP Gateway Authentication & Access Management Tool
"""
from typing import Any, Dict, Optional, Union
from fastmcp import FastMCP
from pydantic import Field
from connectors.gateway_connector import get_gateway_connector
def register_gateway_auth_tools(mcp: FastMCP):
"""Register Gateway authentication and access management tools with the MCP server"""
@mcp.tool()
def gateway_auth_management(
action: str = Field(description="Action: list_users, create_user, update_user, delete_user, get_user, list_teams, create_team, update_team, delete_team, list_organizations, create_organization, update_organization, delete_organization, list_role_definitions, create_role_definition, update_role_definition, delete_role_definition, list_role_assignments, create_role_assignment, delete_role_assignment, list_authenticators, create_authenticator, update_authenticator, delete_authenticator, list_authenticator_maps, create_authenticator_map, update_authenticator_map, delete_authenticator_map, list_applications, create_application, update_application, delete_application, list_tokens, create_token, delete_token, get_me, get_session, list_authenticator_plugins"),
user_id: Optional[Union[int, float]] = Field(None, description="User ID"),
team_id: Optional[Union[int, float]] = Field(None, description="Team ID"),
organization_id: Optional[Union[int, float]] = Field(None, description="Organization ID"),
role_definition_id: Optional[Union[int, float]] = Field(None, description="Role definition ID"),
role_assignment_id: Optional[Union[int, float]] = Field(None, description="Role assignment ID"),
authenticator_id: Optional[Union[int, float]] = Field(None, description="Authenticator ID"),
authenticator_map_id: Optional[Union[int, float]] = Field(None, description="Authenticator map ID"),
application_id: Optional[Union[int, float]] = Field(None, description="OAuth application ID"),
token_id: Optional[Union[int, float]] = Field(None, description="Token ID"),
user_data: Optional[Dict[str, Any]] = Field(None, description="User data"),
team_data: Optional[Dict[str, Any]] = Field(None, description="Team data"),
organization_data: Optional[Dict[str, Any]] = Field(None, description="Organization data"),
role_definition_data: Optional[Dict[str, Any]] = Field(None, description="Role definition data"),
role_assignment_data: Optional[Dict[str, Any]] = Field(None, description="Role assignment data"),
authenticator_data: Optional[Dict[str, Any]] = Field(None, description="Authenticator data"),
authenticator_map_data: Optional[Dict[str, Any]] = Field(None, description="Authenticator map data"),
application_data: Optional[Dict[str, Any]] = Field(None, description="OAuth application data"),
token_data: Optional[Dict[str, Any]] = Field(None, description="Token data"),
filters: Optional[Dict[str, Any]] = Field(None, description="Filters for listing")
) -> Dict[str, Any]:
"""
Gateway authentication and access management tool.
Handles users, teams, organizations, roles, authenticators, and OAuth applications.
"""
try:
client = get_gateway_connector()
# User Operations
if action == "list_users":
params = filters or {}
return client.get("users/", params)
elif action == "create_user":
if not user_data:
return {"error": "user_data is required for creating users"}
return client.post("users/", user_data)
elif action == "get_user":
if not user_id:
return {"error": "user_id is required for getting user details"}
return client.get(f"users/{user_id}/")
elif action == "update_user":
if not user_id or not user_data:
return {"error": "user_id and user_data are required for updating users"}
return client.patch(f"users/{user_id}/", user_data)
elif action == "delete_user":
if not user_id:
return {"error": "user_id is required for deleting users"}
return client.delete(f"users/{user_id}/")
# Team Operations
elif action == "list_teams":
params = filters or {}
return client.get("teams/", params)
elif action == "create_team":
if not team_data:
return {"error": "team_data is required for creating teams"}
return client.post("teams/", team_data)
elif action == "update_team":
if not team_id or not team_data:
return {"error": "team_id and team_data are required for updating teams"}
return client.patch(f"teams/{team_id}/", team_data)
elif action == "delete_team":
if not team_id:
return {"error": "team_id is required for deleting teams"}
return client.delete(f"teams/{team_id}/")
# Organization Operations
elif action == "list_organizations":
params = filters or {}
return client.get("organizations/", params)
elif action == "create_organization":
if not organization_data:
return {"error": "organization_data is required for creating organizations"}
return client.post("organizations/", organization_data)
elif action == "update_organization":
if not organization_id or not organization_data:
return {"error": "organization_id and organization_data are required"}
return client.patch(f"organizations/{organization_id}/", organization_data)
elif action == "delete_organization":
if not organization_id:
return {"error": "organization_id is required for deleting organizations"}
return client.delete(f"organizations/{organization_id}/")
# Role Definition Operations
elif action == "list_role_definitions":
params = filters or {}
return client.get("role_definitions/", params)
elif action == "create_role_definition":
if not role_definition_data:
return {"error": "role_definition_data is required for creating role definitions"}
return client.post("role_definitions/", role_definition_data)
elif action == "update_role_definition":
if not role_definition_id or not role_definition_data:
return {"error": "role_definition_id and role_definition_data are required"}
return client.patch(f"role_definitions/{role_definition_id}/", role_definition_data)
elif action == "delete_role_definition":
if not role_definition_id:
return {"error": "role_definition_id is required for deleting role definitions"}
return client.delete(f"role_definitions/{role_definition_id}/")
# Role Assignment Operations
elif action == "list_role_assignments":
# Check both user and team assignments
user_assignments = client.get("role_user_assignments/", filters or {})
team_assignments = client.get("role_team_assignments/", filters or {})
return {
"user_assignments": user_assignments,
"team_assignments": team_assignments
}
elif action == "create_role_assignment":
if not role_assignment_data:
return {"error": "role_assignment_data is required for creating role assignments"}
# Determine if it's user or team assignment based on data
if "user" in role_assignment_data:
return client.post("role_user_assignments/", role_assignment_data)
elif "team" in role_assignment_data:
return client.post("role_team_assignments/", role_assignment_data)
else:
return {"error": "role_assignment_data must specify either 'user' or 'team'"}
elif action == "delete_role_assignment":
if not role_assignment_id:
return {"error": "role_assignment_id is required for deleting role assignments"}
# Try both endpoints (user and team assignments)
try:
return client.delete(f"role_user_assignments/{role_assignment_id}/")
except:
return client.delete(f"role_team_assignments/{role_assignment_id}/")
# Authenticator Operations
elif action == "list_authenticators":
params = filters or {}
return client.get("authenticators/", params)
elif action == "create_authenticator":
if not authenticator_data:
return {"error": "authenticator_data is required for creating authenticators"}
return client.post("authenticators/", authenticator_data)
elif action == "update_authenticator":
if not authenticator_id or not authenticator_data:
return {"error": "authenticator_id and authenticator_data are required"}
return client.patch(f"authenticators/{authenticator_id}/", authenticator_data)
elif action == "delete_authenticator":
if not authenticator_id:
return {"error": "authenticator_id is required for deleting authenticators"}
return client.delete(f"authenticators/{authenticator_id}/")
# Authenticator Map Operations
elif action == "list_authenticator_maps":
params = filters or {}
return client.get("authenticator_maps/", params)
elif action == "create_authenticator_map":
if not authenticator_map_data:
return {"error": "authenticator_map_data is required for creating authenticator maps"}
return client.post("authenticator_maps/", authenticator_map_data)
elif action == "update_authenticator_map":
if not authenticator_map_id or not authenticator_map_data:
return {"error": "authenticator_map_id and authenticator_map_data are required"}
return client.patch(f"authenticator_maps/{authenticator_map_id}/", authenticator_map_data)
elif action == "delete_authenticator_map":
if not authenticator_map_id:
return {"error": "authenticator_map_id is required for deleting authenticator maps"}
return client.delete(f"authenticator_maps/{authenticator_map_id}/")
# OAuth Application Operations
elif action == "list_applications":
params = filters or {}
return client.get("applications/", params)
elif action == "create_application":
if not application_data:
return {"error": "application_data is required for creating OAuth applications"}
return client.post("applications/", application_data)
elif action == "update_application":
if not application_id or not application_data:
return {"error": "application_id and application_data are required"}
return client.patch(f"applications/{application_id}/", application_data)
elif action == "delete_application":
if not application_id:
return {"error": "application_id is required for deleting OAuth applications"}
return client.delete(f"applications/{application_id}/")
# Token Operations
elif action == "list_tokens":
params = filters or {}
return client.get("tokens/", params)
elif action == "create_token":
if not token_data:
return {"error": "token_data is required for creating tokens"}
return client.post("tokens/", token_data)
elif action == "delete_token":
if not token_id:
return {"error": "token_id is required for deleting tokens"}
return client.delete(f"tokens/{token_id}/")
# Session and Profile Operations
elif action == "get_me":
return client.get("me/")
elif action == "get_session":
return client.get("session/")
# Authenticator Plugin Operations
elif action == "list_authenticator_plugins":
params = filters or {}
return client.get("authenticator_plugins/", params)
else:
return {"error": f"Unknown action: {action}"}
except Exception as e:
return {"error": f"Gateway authentication management failed: {str(e)}"}