Skip to main content
Glama
gateway_auth_management.py13.8 kB
#!/usr/bin/env python3 """ AAP Gateway Authentication & Access Management Tool """ from typing import Any, Dict, Optional, Union from fastmcp import FastMCP from pydantic import Field from connectors.gateway_connector import get_gateway_connector def register_gateway_auth_tools(mcp: FastMCP): """Register Gateway authentication and access management tools with the MCP server""" @mcp.tool() def gateway_auth_management( action: str = Field(description="Action: list_users, create_user, update_user, delete_user, get_user, list_teams, create_team, update_team, delete_team, list_organizations, create_organization, update_organization, delete_organization, list_role_definitions, create_role_definition, update_role_definition, delete_role_definition, list_role_assignments, create_role_assignment, delete_role_assignment, list_authenticators, create_authenticator, update_authenticator, delete_authenticator, list_authenticator_maps, create_authenticator_map, update_authenticator_map, delete_authenticator_map, list_applications, create_application, update_application, delete_application, list_tokens, create_token, delete_token, get_me, get_session, list_authenticator_plugins"), user_id: Optional[Union[int, float]] = Field(None, description="User ID"), team_id: Optional[Union[int, float]] = Field(None, description="Team ID"), organization_id: Optional[Union[int, float]] = Field(None, description="Organization ID"), role_definition_id: Optional[Union[int, float]] = Field(None, description="Role definition ID"), role_assignment_id: Optional[Union[int, float]] = Field(None, description="Role assignment ID"), authenticator_id: Optional[Union[int, float]] = Field(None, description="Authenticator ID"), authenticator_map_id: Optional[Union[int, float]] = Field(None, description="Authenticator map ID"), application_id: Optional[Union[int, float]] = Field(None, description="OAuth application ID"), token_id: Optional[Union[int, float]] = Field(None, description="Token ID"), user_data: Optional[Dict[str, Any]] = Field(None, description="User data"), team_data: Optional[Dict[str, Any]] = Field(None, description="Team data"), organization_data: Optional[Dict[str, Any]] = Field(None, description="Organization data"), role_definition_data: Optional[Dict[str, Any]] = Field(None, description="Role definition data"), role_assignment_data: Optional[Dict[str, Any]] = Field(None, description="Role assignment data"), authenticator_data: Optional[Dict[str, Any]] = Field(None, description="Authenticator data"), authenticator_map_data: Optional[Dict[str, Any]] = Field(None, description="Authenticator map data"), application_data: Optional[Dict[str, Any]] = Field(None, description="OAuth application data"), token_data: Optional[Dict[str, Any]] = Field(None, description="Token data"), filters: Optional[Dict[str, Any]] = Field(None, description="Filters for listing") ) -> Dict[str, Any]: """ Gateway authentication and access management tool. Handles users, teams, organizations, roles, authenticators, and OAuth applications. """ try: client = get_gateway_connector() # User Operations if action == "list_users": params = filters or {} return client.get("users/", params) elif action == "create_user": if not user_data: return {"error": "user_data is required for creating users"} return client.post("users/", user_data) elif action == "get_user": if not user_id: return {"error": "user_id is required for getting user details"} return client.get(f"users/{user_id}/") elif action == "update_user": if not user_id or not user_data: return {"error": "user_id and user_data are required for updating users"} return client.patch(f"users/{user_id}/", user_data) elif action == "delete_user": if not user_id: return {"error": "user_id is required for deleting users"} return client.delete(f"users/{user_id}/") # Team Operations elif action == "list_teams": params = filters or {} return client.get("teams/", params) elif action == "create_team": if not team_data: return {"error": "team_data is required for creating teams"} return client.post("teams/", team_data) elif action == "update_team": if not team_id or not team_data: return {"error": "team_id and team_data are required for updating teams"} return client.patch(f"teams/{team_id}/", team_data) elif action == "delete_team": if not team_id: return {"error": "team_id is required for deleting teams"} return client.delete(f"teams/{team_id}/") # Organization Operations elif action == "list_organizations": params = filters or {} return client.get("organizations/", params) elif action == "create_organization": if not organization_data: return {"error": "organization_data is required for creating organizations"} return client.post("organizations/", organization_data) elif action == "update_organization": if not organization_id or not organization_data: return {"error": "organization_id and organization_data are required"} return client.patch(f"organizations/{organization_id}/", organization_data) elif action == "delete_organization": if not organization_id: return {"error": "organization_id is required for deleting organizations"} return client.delete(f"organizations/{organization_id}/") # Role Definition Operations elif action == "list_role_definitions": params = filters or {} return client.get("role_definitions/", params) elif action == "create_role_definition": if not role_definition_data: return {"error": "role_definition_data is required for creating role definitions"} return client.post("role_definitions/", role_definition_data) elif action == "update_role_definition": if not role_definition_id or not role_definition_data: return {"error": "role_definition_id and role_definition_data are required"} return client.patch(f"role_definitions/{role_definition_id}/", role_definition_data) elif action == "delete_role_definition": if not role_definition_id: return {"error": "role_definition_id is required for deleting role definitions"} return client.delete(f"role_definitions/{role_definition_id}/") # Role Assignment Operations elif action == "list_role_assignments": # Check both user and team assignments user_assignments = client.get("role_user_assignments/", filters or {}) team_assignments = client.get("role_team_assignments/", filters or {}) return { "user_assignments": user_assignments, "team_assignments": team_assignments } elif action == "create_role_assignment": if not role_assignment_data: return {"error": "role_assignment_data is required for creating role assignments"} # Determine if it's user or team assignment based on data if "user" in role_assignment_data: return client.post("role_user_assignments/", role_assignment_data) elif "team" in role_assignment_data: return client.post("role_team_assignments/", role_assignment_data) else: return {"error": "role_assignment_data must specify either 'user' or 'team'"} elif action == "delete_role_assignment": if not role_assignment_id: return {"error": "role_assignment_id is required for deleting role assignments"} # Try both endpoints (user and team assignments) try: return client.delete(f"role_user_assignments/{role_assignment_id}/") except: return client.delete(f"role_team_assignments/{role_assignment_id}/") # Authenticator Operations elif action == "list_authenticators": params = filters or {} return client.get("authenticators/", params) elif action == "create_authenticator": if not authenticator_data: return {"error": "authenticator_data is required for creating authenticators"} return client.post("authenticators/", authenticator_data) elif action == "update_authenticator": if not authenticator_id or not authenticator_data: return {"error": "authenticator_id and authenticator_data are required"} return client.patch(f"authenticators/{authenticator_id}/", authenticator_data) elif action == "delete_authenticator": if not authenticator_id: return {"error": "authenticator_id is required for deleting authenticators"} return client.delete(f"authenticators/{authenticator_id}/") # Authenticator Map Operations elif action == "list_authenticator_maps": params = filters or {} return client.get("authenticator_maps/", params) elif action == "create_authenticator_map": if not authenticator_map_data: return {"error": "authenticator_map_data is required for creating authenticator maps"} return client.post("authenticator_maps/", authenticator_map_data) elif action == "update_authenticator_map": if not authenticator_map_id or not authenticator_map_data: return {"error": "authenticator_map_id and authenticator_map_data are required"} return client.patch(f"authenticator_maps/{authenticator_map_id}/", authenticator_map_data) elif action == "delete_authenticator_map": if not authenticator_map_id: return {"error": "authenticator_map_id is required for deleting authenticator maps"} return client.delete(f"authenticator_maps/{authenticator_map_id}/") # OAuth Application Operations elif action == "list_applications": params = filters or {} return client.get("applications/", params) elif action == "create_application": if not application_data: return {"error": "application_data is required for creating OAuth applications"} return client.post("applications/", application_data) elif action == "update_application": if not application_id or not application_data: return {"error": "application_id and application_data are required"} return client.patch(f"applications/{application_id}/", application_data) elif action == "delete_application": if not application_id: return {"error": "application_id is required for deleting OAuth applications"} return client.delete(f"applications/{application_id}/") # Token Operations elif action == "list_tokens": params = filters or {} return client.get("tokens/", params) elif action == "create_token": if not token_data: return {"error": "token_data is required for creating tokens"} return client.post("tokens/", token_data) elif action == "delete_token": if not token_id: return {"error": "token_id is required for deleting tokens"} return client.delete(f"tokens/{token_id}/") # Session and Profile Operations elif action == "get_me": return client.get("me/") elif action == "get_session": return client.get("session/") # Authenticator Plugin Operations elif action == "list_authenticator_plugins": params = filters or {} return client.get("authenticator_plugins/", params) else: return {"error": f"Unknown action: {action}"} except Exception as e: return {"error": f"Gateway authentication management failed: {str(e)}"}

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/anshulbehl/aap-mcp-pilot'

If you have feedback or need assistance with the MCP directory API, please join our Discord server