#!/usr/bin/env python3
"""
AAP Controller User & Access Management Tool
"""
from typing import Any, Dict, Optional, Union
from fastmcp import FastMCP
from pydantic import Field
from connectors.aap_connector import get_aap_connector
def register_user_tools(mcp: FastMCP):
"""Register user management tools with the MCP server"""
@mcp.tool()
def user_access_management(
action: str = Field(description="Action: list_users, create_user, update_user, delete_user, list_teams, create_team, update_team, list_roles, assign_role, revoke_role, list_permissions, list_oauth_applications"),
user_id: Optional[Union[int, float]] = Field(None, description="User ID"),
team_id: Optional[Union[int, float]] = Field(None, description="Team ID"),
role_id: Optional[Union[int, float]] = Field(None, description="Role ID"),
organization_id: Optional[Union[int, float]] = Field(None, description="Organization ID"),
user_data: Optional[Dict[str, Any]] = Field(None, description="User data"),
team_data: Optional[Dict[str, Any]] = Field(None, description="Team data"),
role_data: Optional[Dict[str, Any]] = Field(None, description="Role assignment data"),
filters: Optional[Dict[str, Any]] = Field(None, description="Filters for listing")
) -> Dict[str, Any]:
"""
User and access management tool.
Handles users, teams, roles, permissions, and OAuth applications.
"""
try:
# User Operations
if action == "list_users":
params = filters or {}
return get_aap_connector().get("users/", params)
elif action == "create_user":
if not user_data:
return {"error": "user_data is required"}
return get_aap_connector().post("users/", user_data)
elif action == "update_user":
if not user_id or not user_data:
return {"error": "user_id and user_data are required"}
return get_aap_connector().patch(f"users/{user_id}/", user_data)
elif action == "delete_user":
if not user_id:
return {"error": "user_id is required"}
return get_aap_connector().delete(f"users/{user_id}/")
# Team Operations
elif action == "list_teams":
params = filters or {}
return get_aap_connector().get("teams/", params)
elif action == "create_team":
if not team_data:
return {"error": "team_data is required"}
return get_aap_connector().post("teams/", team_data)
elif action == "update_team":
if not team_id or not team_data:
return {"error": "team_id and team_data are required"}
return get_aap_connector().patch(f"teams/{team_id}/", team_data)
# Role Operations
elif action == "list_roles":
params = filters or {}
return get_aap_connector().get("roles/", params)
elif action == "assign_role":
if not user_id or not role_data:
return {"error": "user_id and role_data are required"}
return get_aap_connector().post(f"users/{user_id}/roles/", role_data)
elif action == "revoke_role":
if not user_id or not role_id:
return {"error": "user_id and role_id are required"}
return get_aap_connector().post(f"users/{user_id}/roles/", {"id": role_id, "disassociate": True})
# Permission Operations
elif action == "list_permissions":
if user_id:
return get_aap_connector().get(f"users/{user_id}/object_roles/")
elif team_id:
return get_aap_connector().get(f"teams/{team_id}/object_roles/")
else:
params = filters or {}
return get_aap_connector().get("object_roles/", params)
# OAuth Operations
elif action == "list_oauth_applications":
params = filters or {}
return get_aap_connector().get("applications/", params)
else:
return {"error": f"Unknown action: {action}"}
except Exception as e:
return {"error": f"User/Access management failed: {str(e)}"}