Skip to main content
Glama

mcp-netwrixaccessanalayzer

Official
by netwrix
ad_tools.py45.8 kB
from typing import Optional from .. import app from .. import database from .db_tools import run_query # Reuse the run_query tool for execution and formatting from ..logging_config import get_logger import pyodbc logger = get_logger(__name__) def _run_predefined_query(query: str, tool_name: str) -> str: """Checks connection and executes a query using the run_query tool.""" if not database.get_connection(): return "Not connected to a database. Please connect first." try: logger.debug(f"Executing predefined query for {tool_name}") # We call the run_query *tool function* which handles execution, # formatting, and error reporting specific to tools. return run_query(query) except Exception as e: # Catch unexpected errors during the call itself logger.error(f"Unexpected error in {tool_name} wrapper: {e}", exc_info=True) return f"An unexpected error occurred while preparing the query for {tool_name}." @app.mcp.tool("Get-ADEffectiveMembership") def get_effective_group_members( group_dn_filter: Optional[str] = None, group_nt_account_filter: Optional[str] = None, member_dn_filter: Optional[str] = None, member_nt_account_filter: Optional[str] = None, member_object_class_filter: Optional[str] = None, group_sid_filter: Optional[str] = None, member_sid_filter: Optional[str] = None ) -> str: """ Discovers the effective membership of groups in Active Directory, with optional filters. Returns a list of groups, members, and their nesting levels matching the criteria. Filters use SQL LIKE logic; include wildcards (%) where needed (e.g., '%Admin%', 'Domain\\User%'). Args: group_dn_filter: Optional filter (LIKE) for GroupDistinguishedName. group_nt_account_filter: Optional filter (LIKE) for GroupNTAccount (e.g., 'DOMAIN\\GroupName%'). member_dn_filter: Optional filter (LIKE) for MemberDistinguishedName. member_nt_account_filter: Optional filter (LIKE) for MemberNTAccount. member_object_class_filter: Optional filter (LIKE) for MemberObjectClassName (e.g., 'user', 'group'). group_sid_filter: Optional filter (LIKE) for GroupObjectSid (e.g., 'S-1-5-%'). member_sid_filter: Optional filter (LIKE) for MemberObjectSid. """ tool_name = "Get-ADEffectiveMembership" logger.info(f"Tool '{tool_name}' called with filters: " f"GroupDN='{group_dn_filter}', GroupNT='{group_nt_account_filter}', " f"MemberDN='{member_dn_filter}', MemberNT='{member_nt_account_filter}', " f"MemberClass='{member_object_class_filter}', GroupSID='{group_sid_filter}', " f"MemberSID='{member_sid_filter}'") if not database.get_connection(): return "Not connected to a database. Please connect first." # Base query structure base_query = """ SELECT GroupDistinguishedName, GroupDomainCanonicalName, GroupNTAccount, GroupObjectSid, MemberDistinguishedName, MemberDomainCanonicalName, MemberNTAccount, MemberObjectSid, MemberObjectClassName, MinNestingLevel AS NestingLevel FROM SA_ADInventory_EffectiveGroupMembersView with (nolock) """ conditions = [] params = [] # Dynamically add WHERE clauses and parameters based on provided filters if group_dn_filter: conditions.append("GroupDistinguishedName LIKE ?") params.append(group_dn_filter) if group_nt_account_filter: conditions.append("GroupNTAccount LIKE ?") params.append(group_nt_account_filter) if group_sid_filter: conditions.append("GroupObjectSid LIKE ?") params.append(group_sid_filter) if member_dn_filter: conditions.append("MemberDistinguishedName LIKE ?") params.append(member_dn_filter) if member_nt_account_filter: conditions.append("MemberNTAccount LIKE ?") params.append(member_nt_account_filter) if member_sid_filter: conditions.append("MemberObjectSid LIKE ?") params.append(member_sid_filter) if member_object_class_filter: conditions.append("MemberObjectClassName LIKE ?") params.append(member_object_class_filter) final_query = base_query if conditions: final_query += " WHERE " + " AND ".join(conditions) # Add ordering for consistent results final_query += " ORDER BY GroupNTAccount, MemberNTAccount;" logger.debug(f"Constructed query for {tool_name}: {final_query}") logger.debug(f"Query parameters: {tuple(params)}") try: # Execute the potentially parameterized query directly using the database module rows, columns, _ = database.execute_query(final_query, tuple(params)) if not rows and conditions: return f"No effective group memberships found matching the specified filters." elif not rows: return f"No effective group memberships found in the view." # Should be unlikely unless table is empty # Format results using the helper from the database module return database.format_results(rows, columns) except pyodbc.Error as e: logger.error(f"Database error executing {tool_name} query: {e}", exc_info=True) sqlstate = e.args[0] if e.args else 'N/A' return f"Database error getting effective memberships (SQLSTATE: {sqlstate}): {e}" except Exception as e: logger.error(f"Unexpected error in {tool_name}: {e}", exc_info=True) return f"An unexpected error occurred while getting effective memberships: {e}" @app.mcp.tool("Get-ADExceptions") def get_ad_exceptions( name_filter: Optional[str] = None, description_filter: Optional[str] = None, principal_nt_account_filter: Optional[str] = None, object_class_filter: Optional[str] = None ) -> str: """ Retrieves a list of exceptions defined in the AD inventory, with optional filters. Shows exception name, description, associated principal (NT Account), and object class. Filters use SQL LIKE logic; include wildcards (%) where needed (e.g., '%Admin%', 'user%'). Data is sourced from the SA_ADInventory_ExceptionsView. Args: name_filter: Optional filter (LIKE) for the exception's 'Name'. description_filter: Optional filter (LIKE) for the 'Description'. principal_nt_account_filter: Optional filter (LIKE) for 'PrincipalNTAccount'. object_class_filter: Optional filter (LIKE) for 'PrincipalObjectClassName' (e.g., 'user', 'group'). """ tool_name = "Get-ADExceptions" logger.info(f"Tool '{tool_name}' called with filters: " f"Name='{name_filter}', Description='{description_filter}', " f"Principal='{principal_nt_account_filter}', Class='{object_class_filter}'") # Check for database connection if not database.get_connection(): return "Not connected to a database. Please connect first." # Define the base SQL query base_query = """ SELECT Name AS ExceptionName, Description, PrincipalNTAccount, PrincipalObjectClassName AS ObjectClass FROM SA_ADInventory_ExceptionsView with (nolock) """ # --- Dynamic Query Building --- conditions = [] params = [] if name_filter: conditions.append("Name LIKE ?") params.append(name_filter) if description_filter: # Be cautious with filtering large text fields like Description - might be slow conditions.append("Description LIKE ?") params.append(description_filter) if principal_nt_account_filter: conditions.append("PrincipalNTAccount LIKE ?") params.append(principal_nt_account_filter) if object_class_filter: conditions.append("PrincipalObjectClassName LIKE ?") params.append(object_class_filter) final_query = base_query if conditions: final_query += " WHERE " + " AND ".join(conditions) # Add ordering for consistent results final_query += " ORDER BY ExceptionName;" # --- End Dynamic Query Building --- logger.debug(f"Constructed query for {tool_name}: {final_query}") logger.debug(f"Query parameters: {tuple(params)}") try: # Execute the potentially parameterized query rows, columns, _ = database.execute_query(final_query, tuple(params)) # Provide more specific feedback if filters were used if not rows: if conditions: return f"No AD exceptions found matching the specified filters." else: return "No AD exceptions found in SA_ADInventory_ExceptionsView." # Format the results into a string table return database.format_results(rows, columns) except pyodbc.Error as e: logger.error(f"Database error executing {tool_name} query: {e}", exc_info=True) sqlstate = e.args[0] if e.args else 'N/A' return f"Database error getting AD exceptions (SQLSTATE: {sqlstate}): {e}" except Exception as e: logger.error(f"Unexpected error in {tool_name}: {e}", exc_info=True) return f"An unexpected error occurred while getting AD exceptions: {e}" @app.mcp.tool("Get-ADPermissions") def get_ad_permissions( # Filters for Access Entry details access_entry_type_filter: Optional[str] = None, access_entry_sid_filter: Optional[str] = None, principal_name_filter: Optional[str] = None, principal_domain_filter: Optional[str] = None, permission_filter: Optional[str] = None, apply_to_filter: Optional[str] = None, property_name_filter: Optional[str] = None, is_inherited_filter: Optional[str] = None, # Use '0' for False, '1' for True # Filters for the Object the permission applies TO dn_filter: Optional[str] = None, object_sid_filter: Optional[str] = None, object_class_filter: Optional[str] = None, object_domain_filter: Optional[str] = None, # Domain column # Filters for the Object Owner owner_sid_filter: Optional[str] = None, owner_domain_filter: Optional[str] = None ) -> str: """ Retrieves Active Directory permissions from SA_ADPerms_PermissionsView, excluding deleted entries. Allows optional filtering on various permission attributes. Filters use SQL LIKE logic (include wildcards '%') EXCEPT for 'is_inherited_filter'. For 'is_inherited_filter', provide '0' (for False/Not Inherited) or '1' (for True/Inherited). Args: access_entry_type_filter: Optional filter (LIKE) for AccessEntryType (e.g., 'ALLOW', 'DENY'). access_entry_sid_filter: Optional filter (LIKE) for AccessEntryObjectSID. principal_name_filter: Optional filter (LIKE) for AccessEntryPrincipalName. principal_domain_filter: Optional filter (LIKE) for AccessEntryDomain. permission_filter: Optional filter (LIKE) for AccessEntryPermission (e.g., 'GenericRead', '%Write%'). apply_to_filter: Optional filter (LIKE) for AccessEntryApplyTo. property_name_filter: Optional filter (LIKE) for AccessEntryPropertyName. is_inherited_filter: Optional exact filter for IsInherited ('0' or '1'). dn_filter: Optional filter (LIKE) for the object's DistinguishedName. object_sid_filter: Optional filter (LIKE) for the object's ObjectSid. object_class_filter: Optional filter (LIKE) for the object's ObjectClass. object_domain_filter: Optional filter (LIKE) for the object's Domain. owner_sid_filter: Optional filter (LIKE) for OwnerSid. owner_domain_filter: Optional filter (LIKE) for OwnerDomain. """ tool_name = "Get-ADPermissions" logger.info(f"Tool '{tool_name}' called with filters.") # Keep log concise # Input validation for is_inherited_filter validated_is_inherited = None if is_inherited_filter is not None: if is_inherited_filter == '0': validated_is_inherited = 0 elif is_inherited_filter == '1': validated_is_inherited = 1 else: return "Invalid value for is_inherited_filter. Please use '0' or '1'." # Check for database connection if not database.get_connection(): return "Not connected to a database. Please connect first." # Define the base SQL query including the mandatory filter base_query = """ SELECT AccessEntryType, AccessEntryObjectSID, AccessEntryPrincipalName, AccessEntryDomain, AccessEntryPermission, AccessEntryApplyTo, AccessEntryPropertyName, IsInherited, DistinguishedName, ObjectSid, ObjectClass, OwnerSid, Domain, OwnerDomain FROM SA_ADPerms_PermissionsView with (nolock) WHERE isdeleted = 0 """ # --- Dynamic Query Building for Optional Filters --- conditions = [] params = [] # Add LIKE filters filter_map_like = { "AccessEntryType": access_entry_type_filter, "AccessEntryObjectSID": access_entry_sid_filter, "AccessEntryPrincipalName": principal_name_filter, "AccessEntryDomain": principal_domain_filter, "AccessEntryPermission": permission_filter, "AccessEntryApplyTo": apply_to_filter, "AccessEntryPropertyName": property_name_filter, "DistinguishedName": dn_filter, "ObjectSid": object_sid_filter, "ObjectClass": object_class_filter, "OwnerSid": owner_sid_filter, "Domain": object_domain_filter, # Object's Domain "OwnerDomain": owner_domain_filter } for column, filter_value in filter_map_like.items(): if filter_value is not None: conditions.append(f"{column} LIKE ?") params.append(filter_value) # Add exact match filter for IsInherited (if validated) if validated_is_inherited is not None: conditions.append("IsInherited = ?") params.append(validated_is_inherited) # Pass the integer 0 or 1 # Construct the final query final_query = base_query if conditions: # Append the optional filters using AND final_query += " AND " + " AND ".join(conditions) # Add ordering final_query += " ORDER BY DistinguishedName, AccessEntryPrincipalName;" # --- End Dynamic Query Building --- logger.debug(f"Constructed query for {tool_name}: {final_query}") logger.debug(f"Query parameters: {tuple(params)}") try: # Execute the potentially parameterized query rows, columns, _ = database.execute_query(final_query, tuple(params)) if not rows: if conditions: # Check if optional filters were actually added return f"No AD permissions found matching the specified filters (and isdeleted = 0)." else: return "No non-deleted AD permissions found in SA_ADPerms_PermissionsView." # Format the results return database.format_results(rows, columns) except pyodbc.Error as e: logger.error(f"Database error executing {tool_name} query: {e}", exc_info=True) sqlstate = e.args[0] if e.args else 'N/A' return f"Database error getting AD permissions (SQLSTATE: {sqlstate}): {e}" except Exception as e: logger.error(f"Unexpected error in {tool_name}: {e}", exc_info=True) return f"An unexpected error occurred while getting AD permissions: {e}" @app.mcp.tool("Get-DomainControllers") def get_domain_controllers() -> str: """ Retrieves a list of domain controllers from the SA_AD_DCSummary_List view. """ tool_name = "Get-DomainControllers" logger.info(f"Tool '{tool_name}' called.") query = """ select * from SA_AD_DCSummary_List """ return _run_predefined_query(query, tool_name) @app.mcp.tool("Get-CertificateVulnerabilities") def get_certificate_vulnerabilities() -> str: """ Retrieves a list of domain controllers from the SA_AD_DCSummary_List view. """ tool_name = "Get-CertificateVulnerabilities" logger.info(f"Tool '{tool_name}' called.") query = """ select [Certificate Authority], [Name], [Published], [Template Vulnerabilities], [Authority Vulnerabilities] from SA_AD_CertificateAudit_Vulnerabilities """ return _run_predefined_query(query, tool_name) @app.mcp.tool("Get-ADCARights") def get_adca_rights() -> str: """ Retrieves a list of domain controllers from the SA_AD_DCSummary_List view. """ tool_name = "Get-ADCARights" logger.info(f"Tool '{tool_name}' called.") query = """ SELECT [Certificate Authority] , [CA String] , [Domain] , [NTAccount] , [SID] , [Access] , [Ace Type] , [Direct] FROM [SA_AD_CertificateAudit_CARights] """ return _run_predefined_query(query, tool_name) @app.mcp.tool("Get-ADSecurityAssessment") def get_ad_security_assessment() -> str: """ Retrieves results from the Access Analyzer AD Security Assessment """ tool_name = "Get-ADSecurityAssessment" logger.info(f"Tool '{tool_name}' called.") query = """ select Category, [Check], Finding, Risk from SA_AD_SecurityAssessment_Results """ return _run_predefined_query(query, tool_name) @app.mcp.tool("Get-ADUsers") def get_ad_users( # --- Identification Filters --- domain_id_filter: Optional[str] = None, # Exact Match (usually numeric) domain_name_filter: Optional[str] = None, # LIKE domain_canonical_filter: Optional[str] = None, # LIKE principal_id_filter: Optional[str] = None, # Exact Match (usually numeric) sam_account_name_filter: Optional[str] = None, # LIKE nt_account_filter: Optional[str] = None, # LIKE display_name_filter: Optional[str] = None, # LIKE description_filter: Optional[str] = None, # LIKE usn_filter: Optional[str] = None, # LIKE (can be large number) object_sid_filter: Optional[str] = None, # LIKE dn_id_filter: Optional[str] = None, # Exact Match (usually numeric) distinguished_name_filter: Optional[str] = None, # LIKE cn_filter: Optional[str] = None, # LIKE employee_id_filter: Optional[str] = None, # LIKE # --- Date/Time Filters --- when_created_filter: Optional[str] = None, # LIKE (e.g., '2024-04%') when_changed_filter: Optional[str] = None, # LIKE last_logon_filter: Optional[str] = None, # LIKE (for LastLogonTimestamp) account_expires_filter: Optional[str] = None, # LIKE (or '0' for never) pwd_last_set_filter: Optional[str] = None, # LIKE (for PwdLastSetDate) # --- Contact/Org Filters --- mail_filter: Optional[str] = None, # LIKE title_filter: Optional[str] = None, # LIKE company_filter: Optional[str] = None, # LIKE department_filter: Optional[str] = None, # LIKE manager_principal_id_filter: Optional[str] = None, # Exact Match (usually numeric) telephone_filter: Optional[str] = None, # LIKE # --- Exchange Filters --- legacy_exchange_dn_filter: Optional[str] = None, # LIKE home_mdb_filter: Optional[str] = None, # LIKE mailbox_store_filter: Optional[str] = None, # LIKE storage_group_filter: Optional[str] = None, # LIKE exchange_server_filter: Optional[str] = None, # LIKE # --- Account Control Flag Filters (Use '0' or '1') --- account_disabled_filter: Optional[str] = None, # = (ACCOUNTDISABLE) homedir_required_filter: Optional[str] = None, # = (HOMEDIR_REQUIRED) lockout_filter: Optional[str] = None, # = (LOCKOUT) passwd_notreqd_filter: Optional[str] = None, # = (PASSWD_NOTREQD) passwd_cant_change_filter: Optional[str] = None,# = (PASSWD_CANT_CHANGE) encrypted_pwd_allowed_filter: Optional[str] = None, # = (ENCRYPTED_TEXT_PWD_ALLOWED) temp_duplicate_filter: Optional[str] = None, # = (TEMP_DUPLICATE_ACCOUNT) normal_account_filter: Optional[str] = None, # = (NORMAL_ACCOUNT) interdomain_trust_filter: Optional[str] = None, # = (INTERDOMAIN_TRUST_ACCOUNT) workstation_trust_filter: Optional[str] = None, # = (WORKSTATION_TRUST_ACCOUNT) server_trust_filter: Optional[str] = None, # = (SERVER_TRUST_ACCOUNT) dont_expire_pwd_filter: Optional[str] = None, # = (DONT_EXPIRE_PASSWORD) mns_logon_filter: Optional[str] = None, # = (MNS_LOGON_ACCOUNT) smartcard_required_filter: Optional[str] = None,# = (SMARTCARD_REQUIRED) trusted_for_delegation_filter: Optional[str] = None, # = (TRUSTED_FOR_DELEGATION) not_delegated_filter: Optional[str] = None, # = (NOT_DELEGATED) use_des_key_filter: Optional[str] = None, # = (USE_DES_KEY_ONLY) dont_req_preauth_filter: Optional[str] = None, # = (DONT_REQ_PREAUTH) password_expired_filter: Optional[str] = None, # = (PASSWORD_EXPIRED) auth_for_delegation_filter: Optional[str] = None # = (TRUSTED_TO_AUTH_FOR_DELEGATION) ) -> str: """ Retrieves Active Directory user details from SA_ADInventory_UsersView, excluding deleted users. Allows extensive optional filtering on user attributes. Most filters use SQL LIKE logic (include wildcards '%'). ID filters (DomainId, PrincipalId, DnId, ManagerPrincipalId) use exact match. Account Control Flag filters (e.g., account_disabled_filter) require '0' or '1' for exact match. Args: (Numerous filter arguments - see parameter list above) """ tool_name = "Get-ADUsers" logger.info(f"Tool '{tool_name}' called with filters.") # Keep log concise # --- Input Validation for Boolean-like Flags --- bool_filters = { "ACCOUNTDISABLE": account_disabled_filter, "HOMEDIR_REQUIRED": homedir_required_filter, "LOCKOUT": lockout_filter, "PASSWD_NOTREQD": passwd_notreqd_filter, "PASSWD_CANT_CHANGE": passwd_cant_change_filter, "ENCRYPTED_TEXT_PWD_ALLOWED": encrypted_pwd_allowed_filter, "TEMP_DUPLICATE_ACCOUNT": temp_duplicate_filter, "NORMAL_ACCOUNT": normal_account_filter, "INTERDOMAIN_TRUST_ACCOUNT": interdomain_trust_filter, "WORKSTATION_TRUST_ACCOUNT": workstation_trust_filter, "SERVER_TRUST_ACCOUNT": server_trust_filter, "DONT_EXPIRE_PASSWORD": dont_expire_pwd_filter, "MNS_LOGON_ACCOUNT": mns_logon_filter, "SMARTCARD_REQUIRED": smartcard_required_filter, "TRUSTED_FOR_DELEGATION": trusted_for_delegation_filter, "NOT_DELEGATED": not_delegated_filter, "USE_DES_KEY_ONLY": use_des_key_filter, "DONT_REQ_PREAUTH": dont_req_preauth_filter, "PASSWORD_EXPIRED": password_expired_filter, "TRUSTED_TO_AUTH_FOR_DELEGATION": auth_for_delegation_filter } validated_bool_params = {} for column, value in bool_filters.items(): if value is not None: if value == '0': validated_bool_params[column] = 0 elif value == '1': validated_bool_params[column] = 1 else: return f"Invalid value '{value}' for {column}_filter. Please use '0' or '1'." # --- End Validation --- # Check for database connection if not database.get_connection(): return "Not connected to a database. Please connect first." # Define the base SQL query including the mandatory filter base_query = """ SELECT DomainId, DomainName, DomainCanonicalName, PrincipalId, SamAccountName, NTAccount, DisplayName, Description, USN, IsDeleted, ObjectSid, DnId, DistinguishedName, Cn, EmployeeId, WhenCreated, WhenChanged, Mail, LastLogonTimestamp, AccountExpires, Title, Company, Department, ManagerPrincipalId, UserAccountControl, msDSUserAccountControlComputed, TelephoneNumber, PwdLastSetDate, LegacyExchangeDN, HomeMDB, MailboxStore, StorageGroup, ExchangeServer, ACCOUNTDISABLE, HOMEDIR_REQUIRED, LOCKOUT, PASSWD_NOTREQD, PASSWD_CANT_CHANGE, ENCRYPTED_TEXT_PWD_ALLOWED, TEMP_DUPLICATE_ACCOUNT, NORMAL_ACCOUNT, INTERDOMAIN_TRUST_ACCOUNT, WORKSTATION_TRUST_ACCOUNT, SERVER_TRUST_ACCOUNT, DONT_EXPIRE_PASSWORD, MNS_LOGON_ACCOUNT, SMARTCARD_REQUIRED, TRUSTED_FOR_DELEGATION, NOT_DELEGATED, USE_DES_KEY_ONLY, DONT_REQ_PREAUTH, PASSWORD_EXPIRED, TRUSTED_TO_AUTH_FOR_DELEGATION FROM SA_ADInventory_UsersView with (nolock) WHERE IsDeleted = 0 """ # --- Dynamic Query Building for Optional Filters --- conditions = [] params = [] # Map filters requiring LIKE like_filter_map = { "DomainName": domain_name_filter, "DomainCanonicalName": domain_canonical_filter, "SamAccountName": sam_account_name_filter, "NTAccount": nt_account_filter, "DisplayName": display_name_filter, "Description": description_filter, "USN": usn_filter, # Treat USN as string for LIKE "ObjectSid": object_sid_filter, "DistinguishedName": distinguished_name_filter, "Cn": cn_filter, "EmployeeId": employee_id_filter, "WhenCreated": when_created_filter, "WhenChanged": when_changed_filter, "LastLogonTimestamp": last_logon_filter, "AccountExpires": account_expires_filter, "PwdLastSetDate": pwd_last_set_filter, "Mail": mail_filter, "Title": title_filter, "Company": company_filter, "Department": department_filter, "TelephoneNumber": telephone_filter, "LegacyExchangeDN": legacy_exchange_dn_filter, "HomeMDB": home_mdb_filter, "MailboxStore": mailbox_store_filter, "StorageGroup": storage_group_filter, "ExchangeServer": exchange_server_filter, # Note: UserAccountControl & msDSUserAccountControlComputed are numeric, # but LIKE might be useful if user provides partial bitmask value? # If exact numeric match is needed, move to equals_filter_map. "UserAccountControl": None, # Example: Not adding filters for these complex fields by default "msDSUserAccountControlComputed": None } for column, filter_value in like_filter_map.items(): if filter_value is not None: conditions.append(f"{column} LIKE ?") params.append(filter_value) # Map filters requiring = equals_filter_map = { "DomainId": domain_id_filter, "PrincipalId": principal_id_filter, "DnId": dn_id_filter, "ManagerPrincipalId": manager_principal_id_filter } for column, filter_value in equals_filter_map.items(): if filter_value is not None: # Could add validation here to ensure value is numeric if needed conditions.append(f"{column} = ?") params.append(filter_value) # Add validated boolean filters for column, value in validated_bool_params.items(): conditions.append(f"{column} = ?") params.append(value) # Construct the final query final_query = base_query if conditions: # Append the optional filters using AND final_query += " AND " + " AND ".join(conditions) # Add ordering final_query += " ORDER BY NTAccount;" # --- End Dynamic Query Building --- logger.debug(f"Constructed query for {tool_name}: {final_query}") logger.debug(f"Query parameters: {tuple(params)}") try: rows, columns, _ = database.execute_query(final_query, tuple(params)) if not rows: if conditions: # Check if optional filters were actually added return f"No AD users found matching the specified filters (and IsDeleted = 0)." else: return "No non-deleted AD users found in SA_ADInventory_UsersView." # Format the results return database.format_results(rows, columns) except pyodbc.Error as e: logger.error(f"Database error executing {tool_name} query: {e}", exc_info=True) sqlstate = e.args[0] if e.args else 'N/A' return f"Database error getting AD users (SQLSTATE: {sqlstate}): {e}" except Exception as e: logger.error(f"Unexpected error in {tool_name}: {e}", exc_info=True) return f"An unexpected error occurred while getting AD users: {e}" # Ensure the rest of the file remains valid @app.mcp.tool("Get-ADGroups") def get_ad_groups( # --- Identification Filters --- domain_id_filter: Optional[str] = None, # Exact Match (usually numeric) domain_name_filter: Optional[str] = None, # LIKE domain_canonical_filter: Optional[str] = None, # LIKE principal_id_filter: Optional[str] = None, # Exact Match (usually numeric) sam_account_name_filter: Optional[str] = None, # LIKE nt_account_filter: Optional[str] = None, # LIKE display_name_filter: Optional[str] = None, # LIKE description_filter: Optional[str] = None, # LIKE usn_filter: Optional[str] = None, # LIKE (can be large number) is_deleted_filter: Optional[str] = None, # Exact Match ('0' or '1') object_sid_filter: Optional[str] = None, # LIKE dn_id_filter: Optional[str] = None, # Exact Match (usually numeric) distinguished_name_filter: Optional[str] = None, # LIKE cn_filter: Optional[str] = None, # LIKE # --- Date Filters --- when_created_filter: Optional[str] = None, # LIKE (e.g., '2024-04%') when_changed_filter: Optional[str] = None, # LIKE # --- Group Specific Filters --- group_type_filter: Optional[str] = None, # LIKE (e.g., 'Security', 'Distribution') or numeric value group_scope_filter: Optional[str] = None, # LIKE (e.g., 'Global', 'DomainLocal', 'Universal') or numeric value # GroupTarget might be less common to filter on directly? Use LIKE for flexibility group_target_filter: Optional[str] = None, # LIKE # --- Other Attributes --- mail_filter: Optional[str] = None, # LIKE managed_by_principal_id_filter: Optional[str] = None, # Exact Match (usually numeric) # DirectMemberCount filtering might require comparison operators (> < =) rather than LIKE # For simplicity with current structure, using LIKE (treat as string) or exact match (=) direct_member_count_filter: Optional[str] = None, # = (exact count) or LIKE (less common) ) -> str: """ Retrieves Active Directory group details from SA_ADInventory_GroupsView. Allows optional filtering on various group attributes. Includes deleted groups by default. Most filters use SQL LIKE logic (include wildcards '%'). ID filters (DomainId, PrincipalId, DnId, ManagedByPrincipalId) use exact match. 'is_deleted_filter' requires '0' or '1' for exact match. 'direct_member_count_filter' uses exact match (=) by default. Args: (Numerous filter arguments - see parameter list above) is_deleted_filter: Optional filter ('0' or '1') to show only non-deleted or only deleted groups. """ tool_name = "Get-ADGroups" logger.info(f"Tool '{tool_name}' called with filters.") # Keep log concise # --- Input Validation for is_deleted_filter --- validated_is_deleted = None if is_deleted_filter is not None: if is_deleted_filter == '0': validated_is_deleted = 0 elif is_deleted_filter == '1': validated_is_deleted = 1 else: return "Invalid value for is_deleted_filter. Please use '0' or '1'." # --- End Validation --- # Check for database connection if not database.get_connection(): return "Not connected to a database. Please connect first." # Define the base SQL query base_query = """ SELECT DomainId, DomainName, DomainCanonicalName, PrincipalId, SamAccountName, NTAccount, DisplayName, Description, USN, IsDeleted, ObjectSid, DnId, DistinguishedName, Cn, WhenCreated, WhenChanged, GroupType, GroupScope, GroupTarget, Mail, ManagedByPrincipalId, DirectMemberCount FROM SA_ADInventory_GroupsView with (nolock) """ # --- Dynamic Query Building for Optional Filters --- conditions = [] params = [] # Map filters requiring LIKE like_filter_map = { "DomainName": domain_name_filter, "DomainCanonicalName": domain_canonical_filter, "SamAccountName": sam_account_name_filter, "NTAccount": nt_account_filter, "DisplayName": display_name_filter, "Description": description_filter, "USN": usn_filter, # Treat USN as string for LIKE "ObjectSid": object_sid_filter, "DistinguishedName": distinguished_name_filter, "Cn": cn_filter, "WhenCreated": when_created_filter, "WhenChanged": when_changed_filter, "GroupType": group_type_filter, "GroupScope": group_scope_filter, "GroupTarget": group_target_filter, "Mail": mail_filter, # "DirectMemberCount": direct_member_count_filter, # Using = below } for column, filter_value in like_filter_map.items(): if filter_value is not None: conditions.append(f"{column} LIKE ?") params.append(filter_value) # Map filters requiring = equals_filter_map = { "DomainId": domain_id_filter, "PrincipalId": principal_id_filter, "DnId": dn_id_filter, "ManagedByPrincipalId": managed_by_principal_id_filter, "DirectMemberCount": direct_member_count_filter # Using = for count } for column, filter_value in equals_filter_map.items(): if filter_value is not None: # Could add validation here to ensure value is numeric if needed conditions.append(f"{column} = ?") params.append(filter_value) # Add validated is_deleted filter (if provided) if validated_is_deleted is not None: conditions.append(f"IsDeleted = ?") params.append(validated_is_deleted) # Construct the final query final_query = base_query if conditions: # Append the optional filters using WHERE or AND final_query += " WHERE " + " AND ".join(conditions) # Add ordering final_query += " ORDER BY NTAccount;" # --- End Dynamic Query Building --- logger.debug(f"Constructed query for {tool_name}: {final_query}") logger.debug(f"Query parameters: {tuple(params)}") try: # Execute the potentially parameterized query rows, columns, _ = database.execute_query(final_query, tuple(params)) if not rows: if conditions: # Check if optional filters were actually added return f"No AD groups found matching the specified filters." else: return "No AD groups found in SA_ADInventory_GroupsView." # Format the results return database.format_results(rows, columns) except pyodbc.Error as e: logger.error(f"Database error executing {tool_name} query: {e}", exc_info=True) sqlstate = e.args[0] if e.args else 'N/A' return f"Database error getting AD groups (SQLSTATE: {sqlstate}): {e}" except Exception as e: logger.error(f"Unexpected error in {tool_name}: {e}", exc_info=True) return f"An unexpected error occurred while getting AD groups: {e}" @app.mcp.tool("Get-ADComputers") def get_ad_computers( # --- Identification Filters --- domain_id_filter: Optional[str] = None, # Exact Match (usually numeric) domain_name_filter: Optional[str] = None, # LIKE domain_canonical_filter: Optional[str] = None, # LIKE principal_id_filter: Optional[str] = None, # Exact Match (usually numeric) sam_account_name_filter: Optional[str] = None, # LIKE (Often ends with $) nt_account_filter: Optional[str] = None, # LIKE display_name_filter: Optional[str] = None, # LIKE description_filter: Optional[str] = None, # LIKE usn_filter: Optional[str] = None, # LIKE (can be large number) object_sid_filter: Optional[str] = None, # LIKE distinguished_name_filter: Optional[str] = None, # LIKE cn_filter: Optional[str] = None, # LIKE # --- Date/Time Filters --- when_created_filter: Optional[str] = None, # LIKE (e.g., '2024-04%') when_changed_filter: Optional[str] = None, # LIKE last_logon_filter: Optional[str] = None, # LIKE (for LastLogonTimestamp) pwd_last_set_filter: Optional[str] = None, # LIKE (for PwdLastSetDate) # --- Computer Specific Filters --- dns_hostname_filter: Optional[str] = None, # LIKE os_filter: Optional[str] = None, # LIKE (OperatingSystem) os_version_filter: Optional[str] = None, # LIKE (OperatingSystemVersion) os_sp_filter: Optional[str] = None, # LIKE (OperatingSystemServicePack) location_filter: Optional[str] = None, # LIKE # --- Management Filter --- managed_by_principal_id_filter: Optional[str] = None, # Exact Match (usually numeric) # --- Account Control Flag Filters (Use '0' or '1') --- account_disabled_filter: Optional[str] = None, # = (ACCOUNTDISABLE) homedir_required_filter: Optional[str] = None, # = (HOMEDIR_REQUIRED) lockout_filter: Optional[str] = None, # = (LOCKOUT) passwd_notreqd_filter: Optional[str] = None, # = (PASSWD_NOTREQD) passwd_cant_change_filter: Optional[str] = None,# = (PASSWD_CANT_CHANGE) encrypted_pwd_allowed_filter: Optional[str] = None, # = (ENCRYPTED_TEXT_PWD_ALLOWED) temp_duplicate_filter: Optional[str] = None, # = (TEMP_DUPLICATE_ACCOUNT) normal_account_filter: Optional[str] = None, # = (NORMAL_ACCOUNT) interdomain_trust_filter: Optional[str] = None, # = (INTERDOMAIN_TRUST_ACCOUNT) workstation_trust_filter: Optional[str] = None, # = (WORKSTATION_TRUST_ACCOUNT) server_trust_filter: Optional[str] = None, # = (SERVER_TRUST_ACCOUNT) dont_expire_pwd_filter: Optional[str] = None, # = (DONT_EXPIRE_PASSWORD) mns_logon_filter: Optional[str] = None, # = (MNS_LOGON_ACCOUNT) smartcard_required_filter: Optional[str] = None,# = (SMARTCARD_REQUIRED) trusted_for_delegation_filter: Optional[str] = None, # = (TRUSTED_FOR_DELEGATION) not_delegated_filter: Optional[str] = None, # = (NOT_DELEGATED) use_des_key_filter: Optional[str] = None, # = (USE_DES_KEY_ONLY) dont_req_preauth_filter: Optional[str] = None, # = (DONT_REQ_PREAUTH) password_expired_filter: Optional[str] = None, # = (PASSWORD_EXPIRED) auth_for_delegation_filter: Optional[str] = None # = (TRUSTED_TO_AUTH_FOR_DELEGATION) ) -> str: """ Retrieves Active Directory computer details from SA_ADInventory_ComputersView, excluding deleted computers. Allows extensive optional filtering on computer attributes. Most filters use SQL LIKE logic (include wildcards '%'). ID filters (DomainId, PrincipalId, ManagedByPrincipalId) use exact match. Account Control Flag filters (e.g., account_disabled_filter) require '0' or '1' for exact match. Args: (Numerous filter arguments - see parameter list above) """ tool_name = "Get-ADComputers" logger.info(f"Tool '{tool_name}' called with filters.") # Keep log concise # --- Input Validation for Boolean-like Flags --- bool_filters = { "ACCOUNTDISABLE": account_disabled_filter, "HOMEDIR_REQUIRED": homedir_required_filter, "LOCKOUT": lockout_filter, "PASSWD_NOTREQD": passwd_notreqd_filter, "PASSWD_CANT_CHANGE": passwd_cant_change_filter, "ENCRYPTED_TEXT_PWD_ALLOWED": encrypted_pwd_allowed_filter, "TEMP_DUPLICATE_ACCOUNT": temp_duplicate_filter, "NORMAL_ACCOUNT": normal_account_filter, "INTERDOMAIN_TRUST_ACCOUNT": interdomain_trust_filter, "WORKSTATION_TRUST_ACCOUNT": workstation_trust_filter, "SERVER_TRUST_ACCOUNT": server_trust_filter, "DONT_EXPIRE_PASSWORD": dont_expire_pwd_filter, "MNS_LOGON_ACCOUNT": mns_logon_filter, "SMARTCARD_REQUIRED": smartcard_required_filter, "TRUSTED_FOR_DELEGATION": trusted_for_delegation_filter, "NOT_DELEGATED": not_delegated_filter, "USE_DES_KEY_ONLY": use_des_key_filter, "DONT_REQ_PREAUTH": dont_req_preauth_filter, "PASSWORD_EXPIRED": password_expired_filter, "TRUSTED_TO_AUTH_FOR_DELEGATION": auth_for_delegation_filter } validated_bool_params = {} for column, value in bool_filters.items(): if value is not None: if value == '0': validated_bool_params[column] = 0 elif value == '1': validated_bool_params[column] = 1 else: return f"Invalid value '{value}' for {column}_filter. Please use '0' or '1'." # --- End Validation --- # Check for database connection if not database.get_connection(): return "Not connected to a database. Please connect first." # Define the base SQL query including the mandatory filter base_query = """ SELECT DomainId, DomainName, DomainCanonicalName, PrincipalId, SamAccountName, NTAccount, DisplayName, Description, USN, IsDeleted, ObjectSid, DistinguishedName, Cn, WhenCreated, WhenChanged, DnsHostName, OperatingSystem, OperatingSystemVersion, OperatingSystemServicePack, ManagedByPrincipalId, Location, LastLogonTimestamp, PwdLastSetDate, UserAccountControl, ACCOUNTDISABLE, HOMEDIR_REQUIRED, LOCKOUT, PASSWD_NOTREQD, PASSWD_CANT_CHANGE, ENCRYPTED_TEXT_PWD_ALLOWED, TEMP_DUPLICATE_ACCOUNT, NORMAL_ACCOUNT, INTERDOMAIN_TRUST_ACCOUNT, WORKSTATION_TRUST_ACCOUNT, SERVER_TRUST_ACCOUNT, DONT_EXPIRE_PASSWORD, MNS_LOGON_ACCOUNT, SMARTCARD_REQUIRED, TRUSTED_FOR_DELEGATION, NOT_DELEGATED, USE_DES_KEY_ONLY, DONT_REQ_PREAUTH, PASSWORD_EXPIRED, TRUSTED_TO_AUTH_FOR_DELEGATION FROM SA_ADInventory_ComputersView with (nolock) WHERE IsDeleted = 0 """ # --- Dynamic Query Building for Optional Filters --- conditions = [] params = [] # Map filters requiring LIKE like_filter_map = { "DomainName": domain_name_filter, "DomainCanonicalName": domain_canonical_filter, "SamAccountName": sam_account_name_filter, "NTAccount": nt_account_filter, "DisplayName": display_name_filter, "Description": description_filter, "USN": usn_filter, # Treat USN as string for LIKE "ObjectSid": object_sid_filter, "DistinguishedName": distinguished_name_filter, "Cn": cn_filter, "WhenCreated": when_created_filter, "WhenChanged": when_changed_filter, "LastLogonTimestamp": last_logon_filter, "PwdLastSetDate": pwd_last_set_filter, "DnsHostName": dns_hostname_filter, "OperatingSystem": os_filter, "OperatingSystemVersion": os_version_filter, "OperatingSystemServicePack": os_sp_filter, "Location": location_filter, # Note: UserAccountControl is numeric, but LIKE might be useful? See Users tool notes. "UserAccountControl": None # Example: Not adding filters for this complex field by default } for column, filter_value in like_filter_map.items(): if filter_value is not None: conditions.append(f"{column} LIKE ?") params.append(filter_value) # Map filters requiring = equals_filter_map = { "DomainId": domain_id_filter, "PrincipalId": principal_id_filter, "ManagedByPrincipalId": managed_by_principal_id_filter } for column, filter_value in equals_filter_map.items(): if filter_value is not None: # Could add validation here to ensure value is numeric if needed conditions.append(f"{column} = ?") params.append(filter_value) # Add validated boolean filters for column, value in validated_bool_params.items(): conditions.append(f"{column} = ?") params.append(value) # Construct the final query final_query = base_query if conditions: # Append the optional filters using AND final_query += " AND " + " AND ".join(conditions) # Add ordering final_query += " ORDER BY NTAccount;" # --- End Dynamic Query Building --- logger.debug(f"Constructed query for {tool_name}: {final_query}") logger.debug(f"Query parameters: {tuple(params)}") try: # Execute the potentially parameterized query rows, columns, _ = database.execute_query(final_query, tuple(params)) if not rows: if conditions: # Check if optional filters were actually added return f"No AD computers found matching the specified filters (and IsDeleted = 0)." else: return "No non-deleted AD computers found in SA_ADInventory_ComputersView." # Format the results return database.format_results(rows, columns) except pyodbc.Error as e: logger.error(f"Database error executing {tool_name} query: {e}", exc_info=True) sqlstate = e.args[0] if e.args else 'N/A' return f"Database error getting AD computers (SQLSTATE: {sqlstate}): {e}" except Exception as e: logger.error(f"Unexpected error in {tool_name}: {e}", exc_info=True) return f"An unexpected error occurred while getting AD computers: {e}"

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/netwrix/mcp-server-naa'

If you have feedback or need assistance with the MCP directory API, please join our Discord server