Skip to main content
Glama
users.py53.9 kB
""" OPNsense MCP Server - Users Domain This module provides comprehensive user and group management capabilities including: - User CRUD operations (create, read, update, delete, toggle) - Group management with privilege assignment - Group membership management - Authentication and privilege system - External authentication server support - Helper tools for common scenarios (admin users, read-only users, password resets) - Bulk user creation and group templates The module implements full RBAC (Role-Based Access Control) support with: - Direct user privilege assignment - Group-based privilege inheritance - Effective privilege calculation (combines user + group privileges) - Authentication testing against local and external auth servers """ import json import logging from typing import Optional from mcp.server.fastmcp import Context from ..main import mcp from ..core import ( ValidationError, ResourceNotFoundError, ) from ..shared.constants import ( API_CORE_USER_SEARCH, API_CORE_USER_GET, API_CORE_USER_ADD, API_CORE_USER_SET, API_CORE_USER_DEL, API_CORE_USER_TOGGLE, API_CORE_GROUP_SEARCH, API_CORE_GROUP_GET, API_CORE_GROUP_ADD, API_CORE_GROUP_SET, API_CORE_GROUP_DEL, API_CORE_AUTH_PRIVILEGES, API_CORE_AUTH_SERVERS, API_CORE_AUTH_TEST, API_CORE_CONFIG_RELOAD, ) from ..shared.error_handlers import handle_tool_error, validate_uuid logger = logging.getLogger("opnsense-mcp") # ========== HELPER FUNCTIONS ========== async def get_opnsense_client(): """Get configured OPNsense client from server state.""" from ..domains.configuration import get_opnsense_client as get_client return await get_client() # ========== USER MANAGEMENT ========== @mcp.tool(name="list_users", description="List all users in OPNsense") async def list_users(ctx: Context) -> str: """List all users configured in OPNsense. Args: ctx: MCP context Returns: JSON string with list of all users """ try: client = await get_opnsense_client() response = await client.request("POST", API_CORE_USER_SEARCH, operation="list_users") return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "list_users", e) @mcp.tool(name="get_user", description="Get details of a specific user") async def get_user(ctx: Context, user_uuid: Optional[str] = None) -> str: """Get details of a specific user or all users. Args: ctx: MCP context user_uuid: UUID of specific user to retrieve (optional - if not provided, returns all users) Returns: JSON string with user details """ try: client = await get_opnsense_client() # Validate UUID if provided if user_uuid: validate_uuid(user_uuid, "user_uuid") endpoint = f"{API_CORE_USER_GET}/{user_uuid}" else: endpoint = API_CORE_USER_GET response = await client.request("GET", endpoint, operation="get_user") return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "get_user", e) @mcp.tool(name="create_user", description="Create a new user account") async def create_user( ctx: Context, username: str, password: str, full_name: str = "", email: str = "", groups: Optional[str] = None, privileges: Optional[str] = None, enabled: bool = True, expires: Optional[str] = None, comment: str = "" ) -> str: """Create a new user account in OPNsense. **SECURITY WARNING:** Passwords are transmitted in plaintext to OPNsense and hashed server-side. Always use strong, unique passwords (minimum 12+ characters with mixed case, numbers, symbols). Never reuse passwords across accounts or services. Args: ctx: MCP context username: Unique username for the account password: Password for the user (will be hashed server-side) WARNING: Use strong passwords (12+ chars, mixed case, numbers, symbols) full_name: Full name of the user (optional) email: Email address of the user (optional) groups: Comma-separated list of group names (optional) privileges: Comma-separated list of privilege names (optional) enabled: Whether the account should be enabled (default: True) expires: Expiration date in YYYY-MM-DD format (optional) comment: Additional comments about the user (optional) Returns: JSON string with creation result and new user UUID """ try: client = await get_opnsense_client() # Validate required parameters if not username or not password: raise ValidationError("Username and password are required", context={"username": username, "has_password": bool(password)}) if len(username) < 3: raise ValidationError("Username must be at least 3 characters long", context={"username": username}) if len(password) < 6: raise ValidationError("Password must be at least 6 characters long") # Prepare user data user_data = { "user": { "enabled": "1" if enabled else "0", "name": username, "password": password, "full_name": full_name, "email": email, "comment": comment } } # Add groups if specified if groups: # Convert comma-separated string to list and validate group names group_list = [g.strip() for g in groups.split(",") if g.strip()] user_data["user"]["groups"] = ",".join(group_list) # Add privileges if specified if privileges: # Convert comma-separated string to list priv_list = [p.strip() for p in privileges.split(",") if p.strip()] user_data["user"]["priv"] = ",".join(priv_list) # Add expiration if specified if expires: # Basic date format validation import re if not re.match(r'^\d{4}-\d{2}-\d{2}$', expires): raise ValidationError("Expires must be in YYYY-MM-DD format", context={"expires": expires}) user_data["user"]["expires"] = expires # Create the user response = await client.request("POST", API_CORE_USER_ADD, data=user_data, operation="create_user") # Reload configuration if creation was successful if response.get("result") == "saved": await client.request("POST", API_CORE_CONFIG_RELOAD, operation="reload_config_after_user_create") return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "create_user", e) @mcp.tool(name="update_user", description="Update an existing user account") async def update_user( ctx: Context, user_uuid: str, username: Optional[str] = None, password: Optional[str] = None, full_name: Optional[str] = None, email: Optional[str] = None, groups: Optional[str] = None, privileges: Optional[str] = None, enabled: Optional[bool] = None, expires: Optional[str] = None, comment: Optional[str] = None ) -> str: """Update an existing user account in OPNsense. **SECURITY WARNING:** When updating passwords, they are transmitted in plaintext to OPNsense. Always use strong, unique passwords (minimum 12+ characters with mixed case, numbers, symbols). Never reuse passwords across accounts or services. Args: ctx: MCP context user_uuid: UUID of the user to update username: New username (optional) password: New password (optional) WARNING: Use strong passwords (12+ chars, mixed case, numbers, symbols) full_name: New full name (optional) email: New email address (optional) groups: Comma-separated list of group names (optional) privileges: Comma-separated list of privilege names (optional) enabled: Whether the account should be enabled (optional) expires: Expiration date in YYYY-MM-DD format (optional) comment: Additional comments about the user (optional) Returns: JSON string with update result """ try: client = await get_opnsense_client() # Validate UUID validate_uuid(user_uuid, "user_uuid") # Get current user configuration current_user_response = await client.request("GET", f"{API_CORE_USER_GET}/{user_uuid}", operation="get_user_for_update") if "user" not in current_user_response: raise ResourceNotFoundError(f"User with UUID {user_uuid} not found") current_user = current_user_response["user"] # Update only provided fields if username is not None: if len(username) < 3: raise ValidationError("Username must be at least 3 characters long", context={"username": username}) current_user["name"] = username if password is not None: if len(password) < 6: raise ValidationError("Password must be at least 6 characters long") current_user["password"] = password if full_name is not None: current_user["full_name"] = full_name if email is not None: current_user["email"] = email if groups is not None: # Convert comma-separated string to list and validate group names group_list = [g.strip() for g in groups.split(",") if g.strip()] current_user["groups"] = ",".join(group_list) if privileges is not None: # Convert comma-separated string to list priv_list = [p.strip() for p in privileges.split(",") if p.strip()] current_user["priv"] = ",".join(priv_list) if enabled is not None: current_user["enabled"] = "1" if enabled else "0" if expires is not None: if expires: # Only validate if not empty import re if not re.match(r'^\d{4}-\d{2}-\d{2}$', expires): raise ValidationError("Expires must be in YYYY-MM-DD format or empty", context={"expires": expires}) current_user["expires"] = expires if comment is not None: current_user["comment"] = comment # Prepare update data user_data = {"user": current_user} # Update the user response = await client.request("POST", f"{API_CORE_USER_SET}/{user_uuid}", data=user_data, operation="update_user") # Reload configuration if update was successful if response.get("result") == "saved": await client.request("POST", API_CORE_CONFIG_RELOAD, operation="reload_config_after_user_update") return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "update_user", e) @mcp.tool(name="delete_user", description="Delete a user account") async def delete_user(ctx: Context, user_uuid: str) -> str: """Delete a user account from OPNsense. This will remove the user account and all associated data including API keys. Args: ctx: MCP context user_uuid: UUID of the user to delete Returns: JSON string with deletion result """ try: client = await get_opnsense_client() # Validate UUID validate_uuid(user_uuid, "user_uuid") # Delete the user response = await client.request("POST", f"{API_CORE_USER_DEL}/{user_uuid}", operation="delete_user") # Reload configuration if deletion was successful if response.get("result") == "deleted": await client.request("POST", API_CORE_CONFIG_RELOAD, operation="reload_config_after_user_delete") return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "delete_user", e) @mcp.tool(name="toggle_user", description="Enable or disable a user account") async def toggle_user(ctx: Context, user_uuid: str, enabled: bool) -> str: """Enable or disable a user account. Args: ctx: MCP context user_uuid: UUID of the user to toggle enabled: True to enable, False to disable Returns: JSON string with toggle result """ try: client = await get_opnsense_client() # Validate UUID validate_uuid(user_uuid, "user_uuid") # Toggle the user enabled_int = 1 if enabled else 0 response = await client.request("POST", f"{API_CORE_USER_TOGGLE}/{user_uuid}/{enabled_int}", operation="toggle_user") # Reload configuration if toggle was successful if response.get("result") == "saved": await client.request("POST", API_CORE_CONFIG_RELOAD, operation="reload_config_after_user_toggle") return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "toggle_user", e) # ========== GROUP MANAGEMENT ========== @mcp.tool(name="list_groups", description="List all groups in OPNsense") async def list_groups(ctx: Context) -> str: """List all groups configured in OPNsense. Args: ctx: MCP context Returns: JSON string with list of all groups """ try: client = await get_opnsense_client() response = await client.request("POST", API_CORE_GROUP_SEARCH, operation="list_groups") return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "list_groups", e) @mcp.tool(name="get_group", description="Get details of a specific group") async def get_group(ctx: Context, group_uuid: Optional[str] = None) -> str: """Get details of a specific group or all groups. Args: ctx: MCP context group_uuid: UUID of specific group to retrieve (optional - if not provided, returns all groups) Returns: JSON string with group details """ try: client = await get_opnsense_client() # Validate UUID if provided if group_uuid: validate_uuid(group_uuid, "group_uuid") endpoint = f"{API_CORE_GROUP_GET}/{group_uuid}" else: endpoint = API_CORE_GROUP_GET response = await client.request("GET", endpoint, operation="get_group") return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "get_group", e) @mcp.tool(name="create_group", description="Create a new group") async def create_group( ctx: Context, name: str, description: str = "", privileges: Optional[str] = None, members: Optional[str] = None ) -> str: """Create a new group in OPNsense. Args: ctx: MCP context name: Name of the group (must be unique) description: Description of the group (optional) privileges: Comma-separated list of privilege names (optional) members: Comma-separated list of usernames to add to group (optional) Returns: JSON string with creation result and new group UUID """ try: client = await get_opnsense_client() # Validate required parameters if not name: raise ValidationError("Group name is required", context={"name": name}) if len(name) < 2: raise ValidationError("Group name must be at least 2 characters long", context={"name": name}) # Prepare group data group_data = { "group": { "name": name, "description": description } } # Add privileges if specified if privileges: # Convert comma-separated string to list priv_list = [p.strip() for p in privileges.split(",") if p.strip()] group_data["group"]["priv"] = ",".join(priv_list) # Add members if specified if members: # Convert comma-separated string to list and validate usernames member_list = [m.strip() for m in members.split(",") if m.strip()] group_data["group"]["member"] = ",".join(member_list) # Create the group response = await client.request("POST", API_CORE_GROUP_ADD, data=group_data, operation="create_group") # Reload configuration if creation was successful if response.get("result") == "saved": await client.request("POST", API_CORE_CONFIG_RELOAD, operation="reload_config_after_group_create") return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "create_group", e) @mcp.tool(name="update_group", description="Update an existing group") async def update_group( ctx: Context, group_uuid: str, name: Optional[str] = None, description: Optional[str] = None, privileges: Optional[str] = None, members: Optional[str] = None ) -> str: """Update an existing group in OPNsense. Args: ctx: MCP context group_uuid: UUID of the group to update name: New name for the group (optional) description: New description for the group (optional) privileges: Comma-separated list of privilege names (optional) members: Comma-separated list of usernames in group (optional) Returns: JSON string with update result """ try: client = await get_opnsense_client() # Validate UUID validate_uuid(group_uuid, "group_uuid") # Get current group configuration current_group_response = await client.request("GET", f"{API_CORE_GROUP_GET}/{group_uuid}", operation="get_group_for_update") if "group" not in current_group_response: raise ResourceNotFoundError(f"Group with UUID {group_uuid} not found") current_group = current_group_response["group"] # Update only provided fields if name is not None: if len(name) < 2: raise ValidationError("Group name must be at least 2 characters long", context={"name": name}) current_group["name"] = name if description is not None: current_group["description"] = description if privileges is not None: # Convert comma-separated string to list priv_list = [p.strip() for p in privileges.split(",") if p.strip()] current_group["priv"] = ",".join(priv_list) if members is not None: # Convert comma-separated string to list and validate usernames member_list = [m.strip() for m in members.split(",") if m.strip()] current_group["member"] = ",".join(member_list) # Prepare update data group_data = {"group": current_group} # Update the group response = await client.request("POST", f"{API_CORE_GROUP_SET}/{group_uuid}", data=group_data, operation="update_group") # Reload configuration if update was successful if response.get("result") == "saved": await client.request("POST", API_CORE_CONFIG_RELOAD, operation="reload_config_after_group_update") return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "update_group", e) @mcp.tool(name="delete_group", description="Delete a group") async def delete_group(ctx: Context, group_uuid: str) -> str: """Delete a group from OPNsense. This will remove the group and update all users who were members of this group. Args: ctx: MCP context group_uuid: UUID of the group to delete Returns: JSON string with deletion result """ try: client = await get_opnsense_client() # Validate UUID validate_uuid(group_uuid, "group_uuid") # Delete the group response = await client.request("POST", f"{API_CORE_GROUP_DEL}/{group_uuid}", operation="delete_group") # Reload configuration if deletion was successful if response.get("result") == "deleted": await client.request("POST", API_CORE_CONFIG_RELOAD, operation="reload_config_after_group_delete") return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "delete_group", e) @mcp.tool(name="add_user_to_group", description="Add a user to a group") async def add_user_to_group(ctx: Context, group_uuid: str, username: str) -> str: """Add a user to an existing group. Args: ctx: MCP context group_uuid: UUID of the group to modify username: Username to add to the group Returns: JSON string with update result """ try: client = await get_opnsense_client() # Validate UUID validate_uuid(group_uuid, "group_uuid") if not username: raise ValidationError("Username is required", context={"username": username}) # Get current group configuration current_group_response = await client.request("GET", f"{API_CORE_GROUP_GET}/{group_uuid}", operation="get_group_for_member_add") if "group" not in current_group_response: raise ResourceNotFoundError(f"Group with UUID {group_uuid} not found") current_group = current_group_response["group"] # Get current members current_members = [] if "member" in current_group and current_group["member"]: current_members = [m.strip() for m in current_group["member"].split(",") if m.strip()] # Check if user is already a member if username in current_members: return json.dumps({"result": "no_change", "message": f"User '{username}' is already a member of the group"}, indent=2) # Add the new member current_members.append(username) current_group["member"] = ",".join(current_members) # Prepare update data group_data = {"group": current_group} # Update the group response = await client.request("POST", f"{API_CORE_GROUP_SET}/{group_uuid}", data=group_data, operation="add_user_to_group") # Reload configuration if update was successful if response.get("result") == "saved": await client.request("POST", API_CORE_CONFIG_RELOAD, operation="reload_config_after_group_member_add") return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "add_user_to_group", e) @mcp.tool(name="remove_user_from_group", description="Remove a user from a group") async def remove_user_from_group(ctx: Context, group_uuid: str, username: str) -> str: """Remove a user from an existing group. Args: ctx: MCP context group_uuid: UUID of the group to modify username: Username to remove from the group Returns: JSON string with update result """ try: client = await get_opnsense_client() # Validate UUID validate_uuid(group_uuid, "group_uuid") if not username: raise ValidationError("Username is required", context={"username": username}) # Get current group configuration current_group_response = await client.request("GET", f"{API_CORE_GROUP_GET}/{group_uuid}", operation="get_group_for_member_remove") if "group" not in current_group_response: raise ResourceNotFoundError(f"Group with UUID {group_uuid} not found") current_group = current_group_response["group"] # Get current members current_members = [] if "member" in current_group and current_group["member"]: current_members = [m.strip() for m in current_group["member"].split(",") if m.strip()] # Check if user is actually a member if username not in current_members: return json.dumps({"result": "no_change", "message": f"User '{username}' is not a member of the group"}, indent=2) # Remove the member current_members.remove(username) current_group["member"] = ",".join(current_members) # Prepare update data group_data = {"group": current_group} # Update the group response = await client.request("POST", f"{API_CORE_GROUP_SET}/{group_uuid}", data=group_data, operation="remove_user_from_group") # Reload configuration if update was successful if response.get("result") == "saved": await client.request("POST", API_CORE_CONFIG_RELOAD, operation="reload_config_after_group_member_remove") return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "remove_user_from_group", e) # ========== AUTHENTICATION & PRIVILEGE MANAGEMENT ========== @mcp.tool(name="list_privileges", description="List all available privileges in OPNsense") async def list_privileges(ctx: Context) -> str: """List all available privileges and their descriptions in OPNsense. Args: ctx: MCP context Returns: JSON string with list of all available privileges """ try: client = await get_opnsense_client() response = await client.request("GET", API_CORE_AUTH_PRIVILEGES, operation="list_privileges") return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "list_privileges", e) @mcp.tool(name="get_user_effective_privileges", description="Get effective privileges for a user") async def get_user_effective_privileges(ctx: Context, username: str) -> str: """Get the effective privileges for a specific user (combines user and group privileges). Args: ctx: MCP context username: Username to get privileges for Returns: JSON string with user's effective privileges """ try: client = await get_opnsense_client() if not username: raise ValidationError("Username is required", context={"username": username}) # First get the user details to find their UUID users_response = await client.request("POST", API_CORE_USER_SEARCH, operation="search_user_for_privileges") user_uuid = None if "rows" in users_response: for user in users_response["rows"]: if user.get("name") == username: user_uuid = user.get("uuid") break if not user_uuid: raise ResourceNotFoundError(f"User '{username}' not found") # Get detailed user information including groups and privileges user_details_response = await client.request("GET", f"{API_CORE_USER_GET}/{user_uuid}", operation="get_user_privileges") if "user" not in user_details_response: raise ResourceNotFoundError(f"User details for '{username}' not found") user_details = user_details_response["user"] # Collect all privileges effective_privileges = set() # Add direct user privileges if "priv" in user_details and user_details["priv"]: user_privs = [p.strip() for p in user_details["priv"].split(",") if p.strip()] effective_privileges.update(user_privs) # Add group privileges if "groups" in user_details and user_details["groups"]: group_names = [g.strip() for g in user_details["groups"].split(",") if g.strip()] # Get all groups to find UUIDs and privileges groups_response = await client.request("POST", API_CORE_GROUP_SEARCH, operation="search_groups_for_user_privileges") if "rows" in groups_response: for group in groups_response["rows"]: if group.get("name") in group_names: # Get detailed group information group_uuid = group.get("uuid") if group_uuid: group_details_response = await client.request("GET", f"{API_CORE_GROUP_GET}/{group_uuid}", operation="get_group_privileges") if "group" in group_details_response: group_details = group_details_response["group"] if "priv" in group_details and group_details["priv"]: group_privs = [p.strip() for p in group_details["priv"].split(",") if p.strip()] effective_privileges.update(group_privs) # Format result result = { "username": username, "user_uuid": user_uuid, "direct_privileges": [p.strip() for p in user_details.get("priv", "").split(",") if p.strip()], "group_memberships": [g.strip() for g in user_details.get("groups", "").split(",") if g.strip()], "effective_privileges": sorted(list(effective_privileges)), "privilege_count": len(effective_privileges) } return json.dumps(result, indent=2) except Exception as e: return await handle_tool_error(ctx, "get_user_effective_privileges", e) @mcp.tool(name="assign_privilege_to_user", description="Assign a privilege directly to a user") async def assign_privilege_to_user(ctx: Context, username: str, privilege: str) -> str: """Assign a specific privilege directly to a user. Args: ctx: MCP context username: Username to assign privilege to privilege: Privilege name to assign Returns: JSON string with assignment result """ try: client = await get_opnsense_client() if not username or not privilege: raise ValidationError("Username and privilege are required", context={"username": username, "privilege": privilege}) # Find the user users_response = await client.request("POST", API_CORE_USER_SEARCH, operation="search_user_for_privilege_assignment") user_uuid = None if "rows" in users_response: for user in users_response["rows"]: if user.get("name") == username: user_uuid = user.get("uuid") break if not user_uuid: raise ResourceNotFoundError(f"User '{username}' not found") # Get current user details user_details_response = await client.request("GET", f"{API_CORE_USER_GET}/{user_uuid}", operation="get_user_for_privilege_assignment") if "user" not in user_details_response: raise ResourceNotFoundError(f"User details for '{username}' not found") current_user = user_details_response["user"] # Get current privileges current_privileges = [] if "priv" in current_user and current_user["priv"]: current_privileges = [p.strip() for p in current_user["priv"].split(",") if p.strip()] # Check if privilege is already assigned if privilege in current_privileges: return json.dumps({"result": "no_change", "message": f"Privilege '{privilege}' is already assigned to user '{username}'"}, indent=2) # Add the new privilege current_privileges.append(privilege) current_user["priv"] = ",".join(current_privileges) # Update the user user_data = {"user": current_user} response = await client.request("POST", f"{API_CORE_USER_SET}/{user_uuid}", data=user_data, operation="assign_privilege_to_user") # Reload configuration if update was successful if response.get("result") == "saved": await client.request("POST", API_CORE_CONFIG_RELOAD, operation="reload_config_after_privilege_assignment") return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "assign_privilege_to_user", e) @mcp.tool(name="revoke_privilege_from_user", description="Revoke a privilege directly from a user") async def revoke_privilege_from_user(ctx: Context, username: str, privilege: str) -> str: """Revoke a specific privilege directly from a user. Args: ctx: MCP context username: Username to revoke privilege from privilege: Privilege name to revoke Returns: JSON string with revocation result """ try: client = await get_opnsense_client() if not username or not privilege: raise ValidationError("Username and privilege are required", context={"username": username, "privilege": privilege}) # Find the user users_response = await client.request("POST", API_CORE_USER_SEARCH, operation="search_user_for_privilege_revocation") user_uuid = None if "rows" in users_response: for user in users_response["rows"]: if user.get("name") == username: user_uuid = user.get("uuid") break if not user_uuid: raise ResourceNotFoundError(f"User '{username}' not found") # Get current user details user_details_response = await client.request("GET", f"{API_CORE_USER_GET}/{user_uuid}", operation="get_user_for_privilege_revocation") if "user" not in user_details_response: raise ResourceNotFoundError(f"User details for '{username}' not found") current_user = user_details_response["user"] # Get current privileges current_privileges = [] if "priv" in current_user and current_user["priv"]: current_privileges = [p.strip() for p in current_user["priv"].split(",") if p.strip()] # Check if privilege is actually assigned if privilege not in current_privileges: return json.dumps({"result": "no_change", "message": f"Privilege '{privilege}' is not assigned to user '{username}'"}, indent=2) # Remove the privilege current_privileges.remove(privilege) current_user["priv"] = ",".join(current_privileges) # Update the user user_data = {"user": current_user} response = await client.request("POST", f"{API_CORE_USER_SET}/{user_uuid}", data=user_data, operation="revoke_privilege_from_user") # Reload configuration if update was successful if response.get("result") == "saved": await client.request("POST", API_CORE_CONFIG_RELOAD, operation="reload_config_after_privilege_revocation") return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "revoke_privilege_from_user", e) @mcp.tool(name="list_auth_servers", description="List configured authentication servers") async def list_auth_servers(ctx: Context) -> str: """List all configured authentication servers (LDAP, RADIUS, etc.). Args: ctx: MCP context Returns: JSON string with list of authentication servers """ try: client = await get_opnsense_client() response = await client.request("GET", API_CORE_AUTH_SERVERS, operation="list_auth_servers") return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "list_auth_servers", e) @mcp.tool(name="test_user_authentication", description="Test user authentication against configured servers") async def test_user_authentication(ctx: Context, username: str, auth_server: Optional[str] = None) -> str: """Test user authentication against a specific authentication server or all servers. Args: ctx: MCP context username: Username to test authentication for auth_server: Specific authentication server to test against (optional) Returns: JSON string with authentication test results """ try: client = await get_opnsense_client() if not username: raise ValidationError("Username is required", context={"username": username}) # Prepare test data test_data = { "username": username } if auth_server: test_data["auth_server"] = auth_server response = await client.request("POST", API_CORE_AUTH_TEST, data=test_data, operation="test_user_authentication") return json.dumps(response, indent=2) except Exception as e: return await handle_tool_error(ctx, "test_user_authentication", e) # ========== USER MANAGEMENT HELPER TOOLS ========== @mcp.tool(name="create_admin_user", description="Create a new administrative user with full system privileges") async def create_admin_user(ctx: Context, username: str, password: str, full_name: str = "", email: str = "") -> str: """Create a new administrative user with full system privileges. Args: ctx: MCP context username: Username for the new admin user password: Password for the user full_name: Full name of the user (optional) email: Email address of the user (optional) Returns: JSON response with creation status and user details """ try: client = await get_opnsense_client() # Create the user first user_data = { "user": { "name": username, "password": password, "full_name": full_name or username, "email": email, "disabled": "0", "expires": "", "comment": "Administrative user created via MCP" } } response = await client.request("POST", API_CORE_USER_ADD, data=user_data, operation="create_admin_user") if response.get("result") != "saved": return json.dumps({"error": "Failed to create user", "response": response}, indent=2) user_uuid = response.get("uuid") if not user_uuid: return json.dumps({"error": "User created but UUID not returned", "response": response}, indent=2) # Get all available privileges privileges_response = await client.request("GET", API_CORE_AUTH_PRIVILEGES, operation="get_privileges_for_admin") if "privileges" not in privileges_response: return json.dumps({ "user_created": True, "uuid": user_uuid, "warning": "User created but could not retrieve privileges for assignment" }, indent=2) # Assign all privileges to make this a full admin all_privileges = list(privileges_response["privileges"].keys()) privilege_string = ",".join(all_privileges) # Update user with all privileges update_data = { "user": { "name": username, "password": password, "full_name": full_name or username, "email": email, "disabled": "0", "expires": "", "comment": "Administrative user created via MCP", "priv": privilege_string } } await client.request("POST", f"{API_CORE_USER_SET}/{user_uuid}", data=update_data, operation="assign_admin_privileges") # Reload configuration await client.request("POST", API_CORE_CONFIG_RELOAD, operation="reload_config_after_admin_creation") return json.dumps({ "result": "success", "message": f"Administrative user '{username}' created successfully", "uuid": user_uuid, "privileges_assigned": len(all_privileges), "full_admin": True }, indent=2) except Exception as e: return await handle_tool_error(ctx, "create_admin_user", e) @mcp.tool(name="create_readonly_user", description="Create a new read-only user with limited system access") async def create_readonly_user(ctx: Context, username: str, password: str, full_name: str = "", email: str = "") -> str: """Create a new read-only user with limited system access. Args: ctx: MCP context username: Username for the new read-only user password: Password for the user full_name: Full name of the user (optional) email: Email address of the user (optional) Returns: JSON response with creation status and user details """ try: client = await get_opnsense_client() # Define read-only privileges (common monitoring/viewing privileges) readonly_privileges = [ "page-all", # Basic page access "page-status-system", # System status "page-status-interfaces", # Interface status "page-status-logs", # Log viewing "page-diagnostics-all", # Diagnostic tools "page-status-dashboard", # Dashboard access "page-firewall-rules", # Firewall rule viewing (read-only) "page-interfaces-overview" # Interface overview ] # Create user with read-only privileges user_data = { "user": { "name": username, "password": password, "full_name": full_name or username, "email": email, "disabled": "0", "expires": "", "comment": "Read-only user created via MCP", "priv": ",".join(readonly_privileges) } } response = await client.request("POST", API_CORE_USER_ADD, data=user_data, operation="create_readonly_user") if response.get("result") != "saved": return json.dumps({"error": "Failed to create read-only user", "response": response}, indent=2) # Reload configuration await client.request("POST", API_CORE_CONFIG_RELOAD, operation="reload_config_after_readonly_creation") return json.dumps({ "result": "success", "message": f"Read-only user '{username}' created successfully", "uuid": response.get("uuid"), "privileges_assigned": readonly_privileges, "access_level": "read-only" }, indent=2) except Exception as e: return await handle_tool_error(ctx, "create_readonly_user", e) @mcp.tool(name="reset_user_password", description="Reset a user's password by username") async def reset_user_password(ctx: Context, username: str, new_password: str) -> str: """Reset a user's password by username. Args: ctx: MCP context username: Username of the user whose password to reset new_password: New password to set Returns: JSON response with password reset status """ try: client = await get_opnsense_client() # First, find the user by username search_response = await client.request("POST", API_CORE_USER_SEARCH, operation="search_user_for_password_reset") if "rows" not in search_response: return json.dumps({"error": "Failed to retrieve user list"}, indent=2) user_uuid = None user_data = None for user in search_response["rows"]: if user.get("name") == username: user_uuid = user.get("uuid") user_data = user break if not user_uuid: return json.dumps({"error": f"User '{username}' not found"}, indent=2) # Get full user details user_detail_response = await client.request("GET", f"{API_CORE_USER_GET}/{user_uuid}", operation="get_user_details_for_password_reset") if "user" not in user_detail_response: return json.dumps({"error": "Failed to retrieve user details"}, indent=2) current_user = user_detail_response["user"] # Update user with new password (preserve all other settings) update_data = { "user": { "name": current_user.get("name", username), "password": new_password, "full_name": current_user.get("full_name", ""), "email": current_user.get("email", ""), "disabled": current_user.get("disabled", "0"), "expires": current_user.get("expires", ""), "comment": current_user.get("comment", ""), "priv": current_user.get("priv", ""), "groups": current_user.get("groups", "") } } response = await client.request("POST", f"{API_CORE_USER_SET}/{user_uuid}", data=update_data, operation="reset_user_password") if response.get("result") != "saved": return json.dumps({"error": "Failed to reset password", "response": response}, indent=2) # Reload configuration await client.request("POST", API_CORE_CONFIG_RELOAD, operation="reload_config_after_password_reset") return json.dumps({ "result": "success", "message": f"Password successfully reset for user '{username}'", "uuid": user_uuid }, indent=2) except Exception as e: return await handle_tool_error(ctx, "reset_user_password", e) @mcp.tool(name="bulk_user_creation", description="Create multiple users from a template specification") async def bulk_user_creation(ctx: Context, user_template: str) -> str: """Create multiple users from a template specification. Args: ctx: MCP context user_template: JSON string containing user template and list of users to create. Format: { "template": { "password": "default_password", "privileges": ["priv1", "priv2"], "groups": ["group1"], "expires": "", "disabled": "0" }, "users": [ {"username": "user1", "full_name": "User One", "email": "user1@example.com"}, {"username": "user2", "full_name": "User Two", "email": "user2@example.com"} ] } Returns: JSON response with bulk creation results """ try: client = await get_opnsense_client() # Parse the template try: template_data = json.loads(user_template) except json.JSONDecodeError as e: return json.dumps({"error": f"Invalid JSON template: {str(e)}"}, indent=2) if "template" not in template_data or "users" not in template_data: return json.dumps({"error": "Template must contain 'template' and 'users' sections"}, indent=2) template = template_data["template"] users_to_create = template_data["users"] results = [] successful_creations = 0 for user_spec in users_to_create: try: username = user_spec.get("username") if not username: results.append({"error": "Username required for each user", "user_spec": user_spec}) continue # Build user data from template and user-specific overrides user_data = { "user": { "name": username, "password": user_spec.get("password", template.get("password", "")), "full_name": user_spec.get("full_name", template.get("full_name", username)), "email": user_spec.get("email", template.get("email", "")), "disabled": user_spec.get("disabled", template.get("disabled", "0")), "expires": user_spec.get("expires", template.get("expires", "")), "comment": user_spec.get("comment", template.get("comment", "Bulk created via MCP")), "priv": ",".join(user_spec.get("privileges", template.get("privileges", []))), "groups": ",".join(user_spec.get("groups", template.get("groups", []))) } } response = await client.request("POST", API_CORE_USER_ADD, data=user_data, operation=f"bulk_create_user_{username}") if response.get("result") == "saved": results.append({ "username": username, "status": "success", "uuid": response.get("uuid") }) successful_creations += 1 else: results.append({ "username": username, "status": "failed", "error": response.get("validations", "Unknown error") }) except Exception as user_error: results.append({ "username": user_spec.get("username", "unknown"), "status": "failed", "error": str(user_error) }) # Reload configuration if any users were created if successful_creations > 0: await client.request("POST", API_CORE_CONFIG_RELOAD, operation="reload_config_after_bulk_creation") return json.dumps({ "result": "completed", "total_users": len(users_to_create), "successful_creations": successful_creations, "failed_creations": len(users_to_create) - successful_creations, "details": results }, indent=2) except Exception as e: return await handle_tool_error(ctx, "bulk_user_creation", e) @mcp.tool(name="setup_user_group_template", description="Create a user group template with predefined privileges") async def setup_user_group_template(ctx: Context, template_name: str, privileges: str, description: str = "") -> str: """Create a user group template with predefined privileges for common roles. Args: ctx: MCP context template_name: Name for the group template privileges: Comma-separated list of privilege names to assign to the group description: Description of the group's purpose Returns: JSON response with group creation status """ try: client = await get_opnsense_client() # Convert privileges string to list privilege_list = [p.strip() for p in privileges.split(",") if p.strip()] # Validate privileges exist privileges_response = await client.request("GET", API_CORE_AUTH_PRIVILEGES, operation="validate_privileges_for_template") if "privileges" not in privileges_response: return json.dumps({"error": "Could not retrieve available privileges"}, indent=2) available_privileges = set(privileges_response["privileges"].keys()) invalid_privileges = [p for p in privilege_list if p not in available_privileges] if invalid_privileges: return json.dumps({ "error": "Invalid privileges specified", "invalid_privileges": invalid_privileges, "available_privileges": list(available_privileges) }, indent=2) # Create the group group_data = { "group": { "name": template_name, "description": description or f"Template group: {template_name}", "priv": ",".join(privilege_list) } } response = await client.request("POST", API_CORE_GROUP_ADD, data=group_data, operation="create_group_template") if response.get("result") != "saved": return json.dumps({"error": "Failed to create group template", "response": response}, indent=2) # Reload configuration await client.request("POST", API_CORE_CONFIG_RELOAD, operation="reload_config_after_template_creation") return json.dumps({ "result": "success", "message": f"Group template '{template_name}' created successfully", "uuid": response.get("uuid"), "privileges_assigned": privilege_list, "privilege_count": len(privilege_list) }, indent=2) except Exception as e: return await handle_tool_error(ctx, "setup_user_group_template", e)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/floriangrousset/opnsense-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server