Skip to main content
Glama

mcp-netwrixaccessanalayzer

Official
by netwrix
fs_tools.py19.9 kB
# mcp_naa/tools/report_tools.py from typing import Optional from .. import app from .. import database from .db_tools import run_query # Reuse the run_query tool for execution and formatting from ..logging_config import get_logger import pyodbc logger = get_logger(__name__) # Helper to check connection and run a predefined query using the run_query tool def _run_predefined_query(query: str, tool_name: str) -> str: """Checks connection and executes a query using the run_query tool.""" if not database.get_connection(): return "Not connected to a database. Please connect first." try: logger.debug(f"Executing predefined query for {tool_name}") # We call the run_query *tool function* which handles execution, # formatting, and error reporting specific to tools. return run_query(query) except Exception as e: # Catch unexpected errors during the call itself logger.error(f"Unexpected error in {tool_name} wrapper: {e}", exc_info=True) return f"An unexpected error occurred while preparing the query for {tool_name}." @app.mcp.tool("Discover-SensitiveData") def get_sensitive_data() -> str: """ Discovers where sensitive data exists based on DLP matches. Returns a list of shares, criteria, and match counts. """ tool_name = "Discover-SensitiveData" logger.info(f"Tool '{tool_name}' called.") query = """ SELECT stv.HostName, stv.ShareName, mv.CriteriaName, SUM(mv.MatchCount) AS TotalMatches FROM SA_FSDLP_MatchesView AS mv LEFT JOIN SA_FSAA_SharesTraversalView AS stv ON mv.HostID = stv.HostID AND mv.ParentResourceID = stv.ResourceID -- WHERE clause might be needed if stv can have nulls for relevant matches WHERE stv.HostName IS NOT NULL AND stv.ShareName IS NOT NULL GROUP BY stv.HostName, stv.ShareName, mv.CriteriaName ORDER BY TotalMatches DESC; """ return _run_predefined_query(query, tool_name) @app.mcp.tool("Get-OpenShares") def get_open_shares() -> str: """ Discovers open shares (accessible to broad groups like 'Everyone' or 'Domain Users'). Returns a list of shares and the count of folders directly within them marked as exceptions. Note: The original query's definition of 'open' seems tied to 'ExceptionsView', verify this logic. """ tool_name = "Get-OpenShares" logger.info(f"Tool '{tool_name}' called.") # This query identifies shares associated with exceptions where ParentType=1. # Verify if SA_FSAA_ExceptionsView accurately represents 'open access'. # It might be better to query effective access for 'Everyone' or 'Domain Users'. query = """ SELECT stv.NetworkPath AS SharePath, COUNT(DISTINCT ev.ResourceID) AS ExceptionFolderCountInShare -- Count folders with exceptions directly in the share FROM ( -- Select distinct shares at the root level SELECT DISTINCT NetworkPath, HostID, GateID FROM SA_FSAA_SharesTraversalView WHERE NestedLevel = 0 AND ResourceType = 2 -- Assuming ResourceType 2 is Share ) AS stv LEFT JOIN SA_FSAA_ExceptionsView AS ev ON stv.HostID = ev.HostID AND stv.GateID = ev.GateID WHERE ev.ParentType = 1 -- Filter based on exceptions view, definition might need review -- AND ev.ResourceType = 3 -- Optional: Filter for folder exceptions only? GROUP BY stv.NetworkPath ORDER BY stv.NetworkPath; """ return _run_predefined_query(query, tool_name) @app.mcp.tool("Get-TrusteeAccess") def get_trustee_access(trustee: str, levels_down: int = 0) -> str: """ Finds filesystem resources where a specific trustee (Domain\\Name) has access. Filters by levels down from the share root (0 = share level only). """ tool_name = "Get-TrusteeAccess" logger.info(f"Tool '{tool_name}' called for trustee: {trustee}, levels: {levels_down}") # Parameterize inputs to prevent SQL injection if not trustee or '\\' not in trustee: return "Invalid trustee format. Use Domain\\Name." if levels_down < 0: return "Levels down must be a non-negative integer." # Use parameterized query query = """ -- Find the SID for the given trustee (AD or Local) WITH TrusteeSID AS ( SELECT ObjectSID AS sid FROM SA_ADInventory_PrincipalsView WHERE NTAccount = ? UNION SELECT SID FROM SA_FSAA_LocalTrustees WHERE NTDomain + '\\' + NTName = ? ) -- Find access using the SID SELECT 'FileSystem' AS DataSource, stv.NetworkPath, -- Simplified Allow Rights (adjust bitmask values if needed) CASE WHEN (p.AllowRights & 1) <> 0 THEN 'R' ELSE '' END + -- Read CASE WHEN (p.AllowRights & 2) <> 0 THEN 'W' ELSE '' END + -- Write CASE WHEN (p.AllowRights & 4) <> 0 THEN 'D' ELSE '' END + -- Delete CASE WHEN (p.AllowRights & 32) <> 0 THEN 'L' ELSE '' END + -- List CASE WHEN (p.AllowRights & 8) <> 0 THEN 'M' ELSE '' END + -- Modify/ChangePerms? CASE WHEN (p.AllowRights & 16) <> 0 THEN 'A' ELSE '' END AS AllowRightsDescription, -- Full/Admin? -- Simplified Deny Rights CASE WHEN (p.DenyRights & 1) <> 0 THEN 'R' ELSE '' END + CASE WHEN (p.DenyRights & 2) <> 0 THEN 'W' ELSE '' END + CASE WHEN (p.DenyRights & 4) <> 0 THEN 'D' ELSE '' END + CASE WHEN (p.DenyRights & 32) <> 0 THEN 'L' ELSE '' END + CASE WHEN (p.DenyRights & 8) <> 0 THEN 'M' ELSE '' END + CASE WHEN (p.DenyRights & 16) <> 0 THEN 'A' ELSE '' END AS DenyRightsDescription FROM TrusteeSID AS ts CROSS JOIN SA_FSAA_SharesTraversalView AS stv -- Apply the function to get access for the SID on the resource OUTER APPLY SA_AIC_FSAA_GetTrusteeShareAccessEx(stv.HostID, stv.GateID, ts.sid, 0, 1) AS p WHERE stv.NestedLevel = ? -- Filter by levels down AND stv.ResourceType = 3 -- Assuming ResourceType 3 is Folder/File AND p.HOST IS NOT NULL; -- Ensure the function returned some access info """ conn = database.get_connection() if not conn: return "Not connected to a database. Please connect first." try: logger.debug(f"Executing parameterized query for {tool_name}") # Use the core execute_query directly for parameterized queries rows, columns, _ = database.execute_query(query, (trustee, trustee, levels_down)) return database.format_results(rows, columns) except RuntimeError as e: logger.warning(f"{tool_name} failed: {e}") return str(e) except pyodbc.Error as e: logger.error(f"Error getting trustee access for '{trustee}': {e}", exc_info=True) sqlstate = e.args[0] if e.args else 'N/A' return f"Error getting trustee access (SQLSTATE: {sqlstate}): {str(e)}" except Exception as e: logger.error(f"Unexpected error getting trustee access for '{trustee}': {e}", exc_info=True) return f"Unexpected error getting trustee access: {str(e)}" @app.mcp.tool("Get-TrusteePermissionSource") def get_permission_source(trustee: str, resource_path: str) -> str: """For a given trustee (Domain\\Name) and network resource path (UNC), finds the source of their access.""" tool_name = "Get-TrusteePermissionSource" logger.info(f"Tool '{tool_name}' called for trustee: {trustee}, resource: {resource_path}") # Parameterize inputs if not trustee or '\\' not in trustee: return "Invalid trustee format. Use Domain\\Name." if not resource_path or not (resource_path.startswith('\\\\') or resource_path.startswith('//')): return "Invalid resource path format. Use UNC path (e.g., \\\\server\\share\\folder)." query = """ -- Find the resource details WITH ResourceInfo AS ( SELECT HostID, ResourceID, GateID FROM SA_FSAA_LookupUncPath(?) -- Parameterize resource path ), -- Find the SID for the given trustee TrusteeSID AS ( SELECT ObjectSID AS sid FROM SA_ADInventory_PrincipalsView WHERE NTAccount = ? -- Parameterize trustee UNION SELECT SID FROM SA_FSAA_LocalTrustees WHERE NTDomain + '\\' + NTName = ? -- Parameterize trustee ) -- Get permission source using resource details and trustee SID SELECT ps.SourceTrusteePath, ps.ResourcePath, ps.PermissionSourceType, ps.AllowRightsDescription, ps.AllowMaskDescription, ps.DenyRightsDescription, ps.DenyMaskDescription FROM ResourceInfo AS ri INNER JOIN TrusteeSID AS ts ON 1=1 -- Join trustee SID -- Apply function to get permission source OUTER APPLY SA_FSAA_GetTrusteePermissionSource(ri.HostID, ri.ResourceID, ri.GateID, ts.sid) AS ps WHERE ps.ResourcePath IS NOT NULL; -- Ensure the function returned results """ conn = database.get_connection() if not conn: return "Not connected to a database. Please connect first." try: logger.debug(f"Executing parameterized query for {tool_name}") rows, columns, _ = database.execute_query(query, (resource_path, trustee, trustee)) if not rows: # Check if resource path or trustee is invalid check_res_q = "SELECT COUNT(*) FROM SA_FSAA_LookupUncPath(?)" res_exists, _, _ = database.execute_query(check_res_q, (resource_path,)) check_trustee_q = """ SELECT COUNT(*) FROM ( SELECT 1 FROM SA_ADInventory_PrincipalsView WHERE NTAccount = ? UNION ALL SELECT 1 FROM SA_FSAA_LocalTrustees WHERE NTDomain + '\\' + NTName = ? ) AS t """ trustee_exists, _, _ = database.execute_query(check_trustee_q, (trustee, trustee)) if not res_exists or res_exists[0][0] == 0: return f"Resource path '{resource_path}' not found or not scanned by Access Auditor." if not trustee_exists or trustee_exists[0][0] == 0: return f"Trustee '{trustee}' not found." return f"No specific permission source found for trustee '{trustee}' on resource '{resource_path}', or trustee has no permissions there." return database.format_results(rows, columns) except RuntimeError as e: logger.warning(f"{tool_name} failed: {e}") return str(e) except pyodbc.Error as e: logger.error(f"Error getting permission source for '{trustee}' on '{resource_path}': {e}", exc_info=True) sqlstate = e.args[0] if e.args else 'N/A' return f"Error getting permission source (SQLSTATE: {sqlstate}): {str(e)}" except Exception as e: logger.error(f"Unexpected error getting permission source for '{trustee}' on '{resource_path}': {e}", exc_info=True) return f"Unexpected error getting permission source: {str(e)}" @app.mcp.tool("Get-ResourceAccess") def get_resource_access(resource_path: str) -> str: """Gets the effective access list for a specific resource path (UNC).""" tool_name = "Get-ResourceAccess" logger.info(f"Tool '{tool_name}' called for resource: {resource_path}") # Parameterize input if not resource_path or not (resource_path.startswith('\\\\') or resource_path.startswith('//')): return "Invalid resource path format. Use UNC path (e.g., \\\\server\\share\\folder)." query = """ -- Find resource details WITH ResourceInfo AS ( SELECT HostID, ResourceID, GateID FROM SA_FSAA_LookupUncPath(?) -- Parameterize resource path ) -- Get effective access using resource details SELECT -- eav.NetworkPath AS Share, -- This might be redundant if resource_path is specific stv.NetworkPath AS FolderFullPath, -- Get the actual path from traversal view eav.TrusteeNTStyleName, eav.AllowRightsDescription, eav.DenyRightsDescription FROM ResourceInfo AS ri LEFT JOIN SA_FSAA_EffectiveAccessView AS eav ON eav.HostID = ri.HostID AND eav.GateID = ri.GateID AND eav.ResourceID = ri.ResourceID LEFT JOIN SA_FSAA_SharesTraversalView AS stv -- Join to get the canonical path ON stv.HostID = ri.HostID AND stv.ResourceID = ri.ResourceID AND stv.GateID = ri.GateID WHERE eav.TrusteeNTStyleName IS NOT NULL -- Filter out potential nulls if any ORDER BY eav.TrusteeNTStyleName; """ conn = database.get_connection() if not conn: return "Not connected to a database. Please connect first." try: logger.debug(f"Executing parameterized query for {tool_name}") rows, columns, _ = database.execute_query(query, (resource_path,)) if not rows: # Check if resource exists but has no explicit permissions listed check_res_q = "SELECT COUNT(*) FROM SA_FSAA_LookupUncPath(?)" res_exists, _, _ = database.execute_query(check_res_q, (resource_path,)) if not res_exists or res_exists[0][0] == 0: return f"Resource path '{resource_path}' not found or not scanned by Access Auditor." else: # Could have implicit permissions (e.g., via inheritance not shown here) or be empty return f"Resource path '{resource_path}' found, but no specific effective access entries listed in SA_FSAA_EffectiveAccessView." return database.format_results(rows, columns) except RuntimeError as e: logger.warning(f"{tool_name} failed: {e}") return str(e) except pyodbc.Error as e: logger.error(f"Error getting resource access for '{resource_path}': {e}", exc_info=True) sqlstate = e.args[0] if e.args else 'N/A' return f"Error getting resource access (SQLSTATE: {sqlstate}): {str(e)}" except Exception as e: logger.error(f"Unexpected error getting resource access for '{resource_path}': {e}", exc_info=True) return f"Unexpected error getting resource access: {str(e)}" @app.mcp.tool("Get-UnusedAccess") def get_unused_access(resource_path: str, days_inactive: int = 90) -> str: """ For a specified share path, identifies users whose last activity was more than N days ago (default 90). """ tool_name = "Get-UnusedAccess" logger.info(f"Tool '{tool_name}' called for resource: {resource_path}, days: {days_inactive}") # Parameterize inputs if not resource_path or not (resource_path.startswith('\\\\') or resource_path.startswith('//')): return "Invalid resource path format. Use UNC path (e.g., \\\\server\\share)." if days_inactive <= 0: return "Days inactive must be a positive integer." query = """ -- Find the target share details WITH TargetShare AS ( SELECT HostID, GateID, ResourceID FROM SA_FSAA_LookupUncPath(?) -- Parameterize resource path WHERE NestedLevel = 0 -- Ensure it's the share root ), -- Find last activity date per user on the target share/subfolders LastActivity AS ( SELECT dav.HostID, dav.GateID, dav.UserNTStyleName, MAX(dav.ActivityDate) AS LastActivityDate FROM SA_FSAC_DailyActivityView AS dav INNER JOIN TargetShare AS ts ON dav.HostID = ts.HostID AND dav.GateID = ts.GateID -- We need to ensure the activity is *within* the target share tree. -- This requires joining activity folder path back to traversal view. INNER JOIN SA_FSAA_SharesTraversalView stv_activity ON dav.HostID = stv_activity.HostID AND dav.GateID = stv_activity.GateID AND dav.FolderID = stv_activity.ResourceID INNER JOIN SA_FSAA_SharesTraversalView stv_share ON dav.HostID = stv_share.HostID AND dav.GateID = stv_share.GateID AND stv_share.ResourceID = ts.ResourceID -- Match the share root WHERE stv_activity.NetworkPath LIKE stv_share.NetworkPath + '%' -- Activity path starts with share path GROUP BY dav.HostID, dav.GateID, dav.UserNTStyleName ) -- Combine effective access with last activity SELECT stv.HostName, stv.ShareName, eav.TrusteeNTStyleName, eav.AllowRightsDescription, eav.DenyRightsDescription, la.LastActivityDate FROM TargetShare AS ts INNER JOIN SA_FSAA_SharesTraversalView AS stv -- Get share details ON ts.HostID = stv.HostID AND ts.GateID = stv.GateID AND ts.ResourceID = stv.ResourceID INNER JOIN SA_FSAA_EffectiveAccessView AS eav -- Get who has access ON ts.HostID = eav.HostID AND ts.GateID = eav.GateID AND ts.ResourceID = eav.ResourceID LEFT JOIN LastActivity AS la -- Join last activity info ON eav.HostID = la.HostID AND eav.GateID = la.GateID AND eav.TrusteeNTStyleName = la.UserNTStyleName -- Filter for AD Users only (optional, based on original query) WHERE EXISTS ( SELECT 1 FROM SA_ADInventory_UsersView AS uv WHERE uv.NTAccount = eav.TrusteeNTStyleName ) -- Filter for users whose last activity was longer ago than specified days, or never AND (la.LastActivityDate IS NULL OR la.LastActivityDate < DATEADD(day, -?, GETUTCDATE())) -- Parameterize days ORDER BY stv.HostName, stv.ShareName, eav.TrusteeNTStyleName; """ # Note: The activity data join might be complex/slow depending on data volume. # Consider indexing relevant columns in SA_FSAC_DailyActivityView and SA_FSAA_SharesTraversalView. conn = database.get_connection() if not conn: return "Not connected to a database. Please connect first." try: logger.debug(f"Executing parameterized query for {tool_name}") rows, columns, _ = database.execute_query(query, (resource_path, days_inactive)) if not rows: check_res_q = "SELECT COUNT(*) FROM SA_FSAA_LookupUncPath(?)" res_exists, _, _ = database.execute_query(check_res_q, (resource_path,)) if not res_exists or res_exists[0][0] == 0: return f"Resource path '{resource_path}' not found or not scanned." else: return f"No users with unused access found for '{resource_path}' within the last {days_inactive} days, or activity data is unavailable." return database.format_results(rows, columns) except RuntimeError as e: logger.warning(f"{tool_name} failed: {e}") return str(e) except pyodbc.Error as e: logger.error(f"Error getting unused access for '{resource_path}': {e}", exc_info=True) sqlstate = e.args[0] if e.args else 'N/A' return f"Error getting unused access (SQLSTATE: {sqlstate}): {str(e)}" except Exception as e: logger.error(f"Unexpected error getting unused access for '{resource_path}': {e}", exc_info=True) return f"Unexpected error getting unused access: {str(e)}" @app.mcp.tool("Get-RunningJobs") def get_running_jobs() -> str: """Gets currently running Netwrix Access Auditor jobs from SA_JobStatsTbl.""" tool_name = "Get-RunningJobs" logger.info(f"Tool '{tool_name}' called.") query = """ SELECT * FROM SA_JobStatsTbl WHERE EndTime IS NULL ORDER BY JobRunTimeKey DESC; """ return _run_predefined_query(query, tool_name) @app.mcp.tool("Get-ShadowAccess") def get_shadow_access() -> str: """ Retrieves details about shadow access (potential ungoverned access routes). Returns results from SA_ShadowAccess_Details. """ tool_name = "Get-ShadowAccess" logger.info(f"Tool '{tool_name}' called.") query = """ SELECT * FROM SA_ShadowAccess_Details; """ # Add ORDER BY if needed, e.g., ORDER BY TrusteeName, ResourcePath return _run_predefined_query(query, tool_name)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/netwrix/mcp-server-naa'

If you have feedback or need assistance with the MCP directory API, please join our Discord server