Skip to main content
Glama
sharing.pyโ€ข16.5 kB
""" Sharing tools for TrueNAS (SMB, NFS, iSCSI) """ from typing import Dict, Any, List, Optional from .base import BaseTool, tool_handler class SharingTools(BaseTool): """Tools for managing TrueNAS file sharing""" def get_tool_definitions(self) -> list: """Get tool definitions for sharing management""" return [ # SMB Tools ("list_smb_shares", self.list_smb_shares, "List all SMB shares", {}), ("create_smb_share", self.create_smb_share, "Create a new SMB share", {"path": {"type": "string", "required": True}, "name": {"type": "string", "required": True}, "comment": {"type": "string", "required": False}, "read_only": {"type": "boolean", "required": False}}), ("delete_smb_share", self.delete_smb_share, "Delete an SMB share", {"share_name": {"type": "string", "required": True}}), # NFS Tools ("list_nfs_exports", self.list_nfs_exports, "List all NFS exports", {}), ("create_nfs_export", self.create_nfs_export, "Create an NFS export", {"path": {"type": "string", "required": True}, "allowed_networks": {"type": "array", "required": False}, "read_only": {"type": "boolean", "required": False}, "maproot_user": {"type": "string", "required": False}, "maproot_group": {"type": "string", "required": False}}), ("delete_nfs_export", self.delete_nfs_export, "Delete an NFS export", {"export_id": {"type": "integer", "required": True}}), # iSCSI Tools ("list_iscsi_targets", self.list_iscsi_targets, "List all iSCSI targets", {}), ("create_iscsi_target", self.create_iscsi_target, "Create an iSCSI target", {"name": {"type": "string", "required": True}, "alias": {"type": "string", "required": False}}), ] # SMB Share Management @tool_handler async def list_smb_shares(self) -> Dict[str, Any]: """ List all SMB shares Returns: Dictionary containing list of SMB shares """ await self.ensure_initialized() shares = await self.client.get("/sharing/smb") share_list = [] for share in shares: share_info = { "id": share.get("id"), "name": share.get("name"), "path": share.get("path"), "comment": share.get("comment"), "enabled": share.get("enabled", True), "read_only": share.get("ro", False), "browsable": share.get("browsable", True), "guest_ok": share.get("guestok", False), "hosts_allow": share.get("hostsallow", []), "hosts_deny": share.get("hostsdeny", []), "home": share.get("home", False), "timemachine": share.get("timemachine", False), "recyclebin": share.get("recyclebin", False), "audit": share.get("audit", {}) } share_list.append(share_info) return { "success": True, "shares": share_list, "metadata": { "total_shares": len(share_list), "enabled_shares": sum(1 for s in share_list if s["enabled"]), "read_only_shares": sum(1 for s in share_list if s["read_only"]), "guest_shares": sum(1 for s in share_list if s["guest_ok"]), "timemachine_shares": sum(1 for s in share_list if s["timemachine"]) } } @tool_handler async def create_smb_share( self, path: str, name: str, comment: str = "", read_only: bool = False, guest_ok: bool = False, browsable: bool = True, recyclebin: bool = False, hosts_allow: Optional[List[str]] = None, hosts_deny: Optional[List[str]] = None ) -> Dict[str, Any]: """ Create a new SMB share Args: path: Path to share (must exist) name: Share name comment: Optional comment read_only: Whether share is read-only guest_ok: Allow guest access browsable: Show in network browse list recyclebin: Enable recycle bin hosts_allow: List of allowed hosts/networks hosts_deny: List of denied hosts/networks Returns: Dictionary containing created share information """ await self.ensure_initialized() # Ensure path starts with /mnt/ if not path.startswith("/mnt/"): path = f"/mnt/{path}" share_data = { "path": path, "name": name, "comment": comment, "ro": read_only, "guestok": guest_ok, "browsable": browsable, "recyclebin": recyclebin, "enabled": True } if hosts_allow: share_data["hostsallow"] = hosts_allow if hosts_deny: share_data["hostsdeny"] = hosts_deny created = await self.client.post("/sharing/smb", share_data) return { "success": True, "message": f"SMB share '{name}' created successfully", "share": { "id": created.get("id"), "name": created.get("name"), "path": created.get("path"), "enabled": created.get("enabled") } } @tool_handler async def delete_smb_share(self, share_name: str) -> Dict[str, Any]: """ Delete an SMB share Args: share_name: Name of the share to delete Returns: Dictionary confirming deletion """ await self.ensure_initialized() # Find the share shares = await self.client.get("/sharing/smb") target_share = None for share in shares: if share.get("name") == share_name: target_share = share break if not target_share: return { "success": False, "error": f"SMB share '{share_name}' not found" } # Delete the share share_id = target_share["id"] await self.client.delete(f"/sharing/smb/id/{share_id}") return { "success": True, "message": f"SMB share '{share_name}' deleted successfully", "deleted": { "name": share_name, "path": target_share.get("path") } } # NFS Export Management @tool_handler async def list_nfs_exports(self) -> Dict[str, Any]: """ List all NFS exports Returns: Dictionary containing list of NFS exports """ await self.ensure_initialized() exports = await self.client.get("/sharing/nfs") export_list = [] for export in exports: export_info = { "id": export.get("id"), "path": export.get("path"), "comment": export.get("comment"), "enabled": export.get("enabled", True), "read_only": export.get("ro", False), "maproot_user": export.get("maproot_user"), "maproot_group": export.get("maproot_group"), "mapall_user": export.get("mapall_user"), "mapall_group": export.get("mapall_group"), "networks": export.get("networks", []), "hosts": export.get("hosts", []), "alldirs": export.get("alldirs", False), "security": export.get("security", []) } export_list.append(export_info) return { "success": True, "exports": export_list, "metadata": { "total_exports": len(export_list), "enabled_exports": sum(1 for e in export_list if e["enabled"]), "read_only_exports": sum(1 for e in export_list if e["read_only"]), "alldirs_exports": sum(1 for e in export_list if e["alldirs"]) } } @tool_handler async def create_nfs_export( self, path: str, allowed_networks: Optional[List[str]] = None, allowed_hosts: Optional[List[str]] = None, read_only: bool = False, maproot_user: str = "root", maproot_group: str = "wheel", alldirs: bool = False, comment: str = "" ) -> Dict[str, Any]: """ Create an NFS export Args: path: Path to export (must exist) allowed_networks: List of allowed networks (e.g., ["10.0.0.0/24"]) allowed_hosts: List of allowed hosts read_only: Whether export is read-only maproot_user: User to map root to maproot_group: Group to map root to alldirs: Allow mounting of subdirectories comment: Optional comment Returns: Dictionary containing created export information """ await self.ensure_initialized() # Ensure path starts with /mnt/ if not path.startswith("/mnt/"): path = f"/mnt/{path}" # Default to allow all if no restrictions specified if not allowed_networks and not allowed_hosts: allowed_networks = ["0.0.0.0/0"] export_data = { "path": path, "comment": comment, "ro": read_only, "maproot_user": maproot_user, "maproot_group": maproot_group, "alldirs": alldirs, "enabled": True } if allowed_networks: export_data["networks"] = allowed_networks if allowed_hosts: export_data["hosts"] = allowed_hosts created = await self.client.post("/sharing/nfs", export_data) # Generate example mount command server_ip = str(self.settings.truenas_url).replace("https://", "").replace("http://", "").split(":")[0] mount_example = f"mount -t nfs {server_ip}:{path} /local/mount/point" return { "success": True, "message": f"NFS export created for path '{path}'", "export": { "id": created.get("id"), "path": created.get("path"), "networks": created.get("networks", []), "enabled": created.get("enabled") }, "mount_example": mount_example } @tool_handler async def delete_nfs_export(self, export_id: int) -> Dict[str, Any]: """ Delete an NFS export Args: export_id: ID of the export to delete Returns: Dictionary confirming deletion """ await self.ensure_initialized() # Get the export details first exports = await self.client.get("/sharing/nfs") target_export = None for export in exports: if export.get("id") == export_id: target_export = export break if not target_export: return { "success": False, "error": f"NFS export with ID {export_id} not found" } # Delete the export await self.client.delete(f"/sharing/nfs/id/{export_id}") return { "success": True, "message": f"NFS export deleted successfully", "deleted": { "id": export_id, "path": target_export.get("path") } } # iSCSI Management @tool_handler async def list_iscsi_targets(self) -> Dict[str, Any]: """ List all iSCSI targets Returns: Dictionary containing list of iSCSI targets """ await self.ensure_initialized() targets = await self.client.get("/iscsi/target") extents = await self.client.get("/iscsi/extent") target_extents = await self.client.get("/iscsi/targetextent") # Map extents to targets target_extent_map = {} for te in target_extents: target_id = te.get("target") extent_id = te.get("extent") if target_id not in target_extent_map: target_extent_map[target_id] = [] target_extent_map[target_id].append(extent_id) # Build extent lookup extent_map = {e["id"]: e for e in extents} target_list = [] for target in targets: target_id = target.get("id") extent_ids = target_extent_map.get(target_id, []) target_extents_info = [] for extent_id in extent_ids: if extent_id in extent_map: extent = extent_map[extent_id] target_extents_info.append({ "id": extent["id"], "name": extent.get("name"), "type": extent.get("type"), "path": extent.get("path") or extent.get("disk"), "filesize": self.format_size(extent.get("filesize", 0)) if extent.get("filesize") else None, "enabled": extent.get("enabled", True) }) target_info = { "id": target.get("id"), "name": target.get("name"), "alias": target.get("alias"), "iqn": target.get("name"), # IQN is stored in name field "mode": target.get("mode"), "groups": target.get("groups", []), "extents": target_extents_info } target_list.append(target_info) return { "success": True, "targets": target_list, "metadata": { "total_targets": len(target_list), "total_extents": len(extents), "targets_with_extents": sum(1 for t in target_list if t["extents"]) } } @tool_handler async def create_iscsi_target( self, name: str, alias: Optional[str] = None, mode: str = "ISCSI", auth_networks: Optional[List[str]] = None ) -> Dict[str, Any]: """ Create an iSCSI target Args: name: Target name (will be part of IQN) alias: Optional alias for the target mode: Target mode (ISCSI or FC) auth_networks: List of authorized networks Returns: Dictionary containing created target information """ await self.ensure_initialized() # Generate IQN from datetime import datetime year_month = datetime.now().strftime("%Y-%m") iqn = f"iqn.{year_month}.com.truenas:{name}" target_data = { "name": iqn, "alias": alias or name, "mode": mode, "groups": [] } # Create auth group if networks specified if auth_networks: # This would require creating an auth group first # For now, we'll note this in the response pass created = await self.client.post("/iscsi/target", target_data) return { "success": True, "message": f"iSCSI target '{name}' created successfully", "target": { "id": created.get("id"), "iqn": created.get("name"), "alias": created.get("alias") }, "next_steps": [ "Create an extent (LUN) for this target", "Map the extent to this target", "Configure initiator groups if needed", "Enable iSCSI service if not already enabled" ] }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/vespo92/TrueNasCoreMCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server