create_user
Create a new user in a Keycloak realm with required username and optional email, name, enabled status, email verification, temporary password, and attributes.
Instructions
Create a new user.
Args:
username: Username for the new user
email: Email address
first_name: First name
last_name: Last name
enabled: Whether the user is enabled
email_verified: Whether the email is verified
temporary_password: Initial password (user will be required to change it)
attributes: Additional user attributes
realm: Target realm (uses default if not specified)
Returns:
Dict with status and location of created userInput Schema
| Name | Required | Description | Default |
|---|---|---|---|
| username | Yes | ||
| No | |||
| first_name | No | ||
| last_name | No | ||
| enabled | No | ||
| email_verified | No | ||
| temporary_password | No | ||
| attributes | No | ||
| realm | No |
Output Schema
| Name | Required | Description | Default |
|---|---|---|---|
| result | Yes |
Implementation Reference
- src/tools/user_tools.py:66-117 (handler)The main handler function for the create_user tool. Decorated with @mcp.tool(), it accepts user parameters (username, email, first_name, last_name, enabled, email_verified, temporary_password, attributes, realm), builds a user_data dict, and makes a POST request to Keycloak's /users endpoint to create a new user. Returns a status dict.
@mcp.tool() async def create_user( username: str, email: Optional[str] = None, first_name: Optional[str] = None, last_name: Optional[str] = None, enabled: bool = True, email_verified: bool = False, temporary_password: Optional[str] = None, attributes: Optional[Dict[str, List[str]]] = None, realm: Optional[str] = None, ) -> Dict[str, str]: """ Create a new user. Args: username: Username for the new user email: Email address first_name: First name last_name: Last name enabled: Whether the user is enabled email_verified: Whether the email is verified temporary_password: Initial password (user will be required to change it) attributes: Additional user attributes realm: Target realm (uses default if not specified) Returns: Dict with status and location of created user """ user_data = { "username": username, "enabled": enabled, "emailVerified": email_verified, } if email: user_data["email"] = email if first_name: user_data["firstName"] = first_name if last_name: user_data["lastName"] = last_name if attributes: user_data["attributes"] = attributes if temporary_password: user_data["credentials"] = [ {"type": "password", "value": temporary_password, "temporary": True} ] # Create user returns no content, but includes Location header await client._make_request("POST", "/users", data=user_data, realm=realm) return {"status": "created", "message": f"User {username} created successfully"} - src/common/server.py:1-4 (registration)The FastMCP server instance ('mcp') is created here. The @mcp.tool() decorator on create_user registers it as an MCP tool.
from mcp.server.fastmcp import FastMCP # Initialize FastMCP server mcp = FastMCP("Keycloak MCP Server", dependencies=["dotenv"]) - src/main.py:17-23 (registration)Main entry point imports the tools modules (including user_tools) to ensure all @mcp.tool() decorated functions are registered with the server.
# Import all tool modules to register them with the MCP server from . import tools # noqa: F401 from .tools import user_tools # noqa: F401 from .tools import client_tools # noqa: F401 from .tools import realm_tools # noqa: F401 from .tools import role_tools # noqa: F401 from .tools import group_tools # noqa: F401 - src/tools/keycloak_client.py:7-113 (helper)KeycloakClient class used by create_user to make authenticated HTTP requests to Keycloak's admin API. The _make_request method handles the POST to /users.
class KeycloakClient: def __init__(self): self.server_url = KEYCLOAK_CFG["server_url"] self.username = KEYCLOAK_CFG["username"] self.password = KEYCLOAK_CFG["password"] self.realm_name = ( KEYCLOAK_CFG["realm_name"] if KEYCLOAK_CFG["realm_name"] else DEFAULT_REALM ) self.client_id = KEYCLOAK_CFG["client_id"] self.client_secret = KEYCLOAK_CFG["client_secret"] self.token = None self.refresh_token = None self._client = None async def _ensure_client(self): """Ensure httpx async client exists""" if self._client is None: self._client = httpx.AsyncClient(timeout=DEFAULT_REQUEST_TIMEOUT) return self._client async def _get_token(self) -> str: """Get access token using username and password""" # Try new URL structure first, then fall back to old one token_url = f"{self.server_url}/auth/realms/{self.realm_name}/protocol/openid-connect/token" data = { "grant_type": "password", "username": self.username, "password": self.password, "client_id": "admin-cli", # Using admin-cli for admin operations } client = await self._ensure_client() response = await client.post(token_url, data=data) response.raise_for_status() token_data = response.json() self.token = token_data["access_token"] self.refresh_token = token_data.get("refresh_token") return self.token async def _get_headers(self) -> Dict[str, str]: """Get headers with authorization token""" if not self.token: await self._get_token() return { "Authorization": f"Bearer {self.token}", "Content-Type": "application/json", } async def _make_request( self, method: str, endpoint: str, data: Optional[Dict] = None, params: Optional[Dict] = None, skip_realm: bool = False, realm: Optional[str] = None, ) -> Any: """Make authenticated request to Keycloak API""" if skip_realm: url = f"{self.server_url}/auth/admin{endpoint}" else: # Use provided realm or fall back to configured realm target_realm = realm if realm is not None else self.realm_name url = f"{self.server_url}/auth/admin/realms/{target_realm}{endpoint}" try: client = await self._ensure_client() headers = await self._get_headers() response = await client.request( method=method, url=url, headers=headers, json=data, params=params, ) # If token expired, refresh and retry if response.status_code == 401: await self._get_token() headers = await self._get_headers() response = await client.request( method=method, url=url, headers=headers, json=data, params=params, ) response.raise_for_status() if response.content: return response.json() return None except httpx.RequestError as e: raise Exception(f"Keycloak API request failed: {str(e)}") async def close(self): """Close the httpx client""" if self._client: await self._client.aclose() self._client = None