Skip to main content
Glama
users.pyโ€ข11 kB
""" User management tools for TrueNAS """ from typing import Dict, Any, List, Optional from .base import BaseTool, tool_handler class UserTools(BaseTool): """Tools for managing TrueNAS users""" def get_tool_definitions(self) -> list: """Get tool definitions for user management""" return [ ("list_users", self.list_users, "List all users in TrueNAS", {}), ("get_user", self.get_user, "Get detailed information about a specific user", {"username": {"type": "string", "required": True}}), ("create_user", self.create_user, "Create a new user", {"username": {"type": "string", "required": True}, "full_name": {"type": "string", "required": False}, "email": {"type": "string", "required": False}, "password": {"type": "string", "required": True}, "shell": {"type": "string", "required": False}, "home": {"type": "string", "required": False}, "groups": {"type": "array", "required": False}}), ("update_user", self.update_user, "Update an existing user", {"username": {"type": "string", "required": True}, "updates": {"type": "object", "required": True}}), ("delete_user", self.delete_user, "Delete a user", {"username": {"type": "string", "required": True}}), ] @tool_handler async def list_users(self) -> Dict[str, Any]: """ List all users in TrueNAS Returns: Dictionary containing list of users and metadata """ await self.ensure_initialized() users = await self.client.get("/user") # Filter and format user data user_list = [] for user in users: user_info = { "id": user.get("id"), "username": user.get("username"), "full_name": user.get("full_name"), "email": user.get("email"), "uid": user.get("uid"), "groups": user.get("groups", []), "shell": user.get("shell"), "home": user.get("home"), "locked": user.get("locked", False), "sudo": user.get("sudo", False), "builtin": user.get("builtin", False) } user_list.append(user_info) # Categorize users system_users = [u for u in user_list if u["builtin"]] regular_users = [u for u in user_list if not u["builtin"]] return { "success": True, "users": user_list, "metadata": { "total_count": len(user_list), "system_users": len(system_users), "regular_users": len(regular_users), "locked_users": sum(1 for u in user_list if u["locked"]) } } @tool_handler async def get_user(self, username: str) -> Dict[str, Any]: """ Get detailed information about a specific user Args: username: Username to look up Returns: Dictionary containing user details """ await self.ensure_initialized() # Get all users and find the specific one users = await self.client.get("/user") target_user = None for user in users: if user.get("username") == username: target_user = user break if not target_user: return { "success": False, "error": f"User '{username}' not found" } # Get additional user details if available user_details = { "id": target_user.get("id"), "username": target_user.get("username"), "full_name": target_user.get("full_name"), "email": target_user.get("email"), "uid": target_user.get("uid"), "gid": target_user.get("group", {}).get("gid") if isinstance(target_user.get("group"), dict) else None, "groups": target_user.get("groups", []), "shell": target_user.get("shell"), "home": target_user.get("home"), "locked": target_user.get("locked", False), "sudo": target_user.get("sudo", False), "builtin": target_user.get("builtin", False), "microsoft_account": target_user.get("microsoft_account", False), "attributes": target_user.get("attributes", {}), "sshpubkey": target_user.get("sshpubkey"), "created": target_user.get("created"), "modified": target_user.get("modified") } return { "success": True, "user": user_details } @tool_handler async def create_user( self, username: str, password: str, full_name: Optional[str] = None, email: Optional[str] = None, shell: Optional[str] = "/bin/bash", home: Optional[str] = None, groups: Optional[List[str]] = None, sudo: bool = False ) -> Dict[str, Any]: """ Create a new user Args: username: Username for the new user password: Password for the new user full_name: Full name of the user email: Email address shell: User shell (default: /bin/bash) home: Home directory (auto-generated if not specified) groups: List of group names to add user to sudo: Whether to grant sudo privileges Returns: Dictionary containing created user information """ await self.ensure_initialized() # Prepare user data user_data = { "username": username, "password": password, "full_name": full_name or username, "email": email, "shell": shell, "sudo": sudo } # Set home directory if home: user_data["home"] = home else: user_data["home"] = f"/home/{username}" # Add groups if specified if groups: # Get all groups to validate all_groups = await self.client.get("/group") valid_groups = [g["id"] for g in all_groups if g["name"] in groups] if valid_groups: user_data["groups"] = valid_groups # Create the user created_user = await self.client.post("/user", user_data) return { "success": True, "message": f"User '{username}' created successfully", "user": { "id": created_user.get("id"), "username": created_user.get("username"), "uid": created_user.get("uid"), "home": created_user.get("home"), "shell": created_user.get("shell") } } @tool_handler async def update_user(self, username: str, updates: Dict[str, Any]) -> Dict[str, Any]: """ Update an existing user Args: username: Username to update updates: Dictionary of fields to update Returns: Dictionary containing updated user information """ await self.ensure_initialized() # Find the user users = await self.client.get("/user") target_user = None for user in users: if user.get("username") == username: target_user = user break if not target_user: return { "success": False, "error": f"User '{username}' not found" } # Validate and prepare updates allowed_updates = { "full_name", "email", "shell", "home", "locked", "sudo", "password", "groups", "sshpubkey" } filtered_updates = { k: v for k, v in updates.items() if k in allowed_updates } if not filtered_updates: return { "success": False, "error": "No valid fields to update" } # Update the user user_id = target_user["id"] updated_user = await self.client.put(f"/user/id/{user_id}", filtered_updates) return { "success": True, "message": f"User '{username}' updated successfully", "updated_fields": list(filtered_updates.keys()), "user": { "id": updated_user.get("id"), "username": updated_user.get("username"), "full_name": updated_user.get("full_name"), "email": updated_user.get("email") } } @tool_handler async def delete_user(self, username: str, delete_home: bool = False) -> Dict[str, Any]: """ Delete a user Args: username: Username to delete delete_home: Whether to delete the user's home directory Returns: Dictionary confirming deletion """ await self.ensure_initialized() # Check if destructive operations are allowed if not self.settings.enable_destructive_operations: return { "success": False, "error": "Destructive operations are disabled. Enable TRUENAS_ENABLE_DESTRUCTIVE_OPS to allow user deletion." } # Find the user users = await self.client.get("/user") target_user = None for user in users: if user.get("username") == username: target_user = user break if not target_user: return { "success": False, "error": f"User '{username}' not found" } # Check if it's a system user if target_user.get("builtin"): return { "success": False, "error": f"Cannot delete built-in system user '{username}'" } # Delete the user user_id = target_user["id"] delete_options = {"delete_group": True} if delete_home: delete_options["delete_home"] = True await self.client.delete(f"/user/id/{user_id}", delete_options) return { "success": True, "message": f"User '{username}' deleted successfully", "deleted": { "username": username, "uid": target_user.get("uid"), "home_deleted": delete_home } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/vespo92/TrueNasCoreMCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server