Skip to main content
Glama

BloodHound MCP

by MorDavid
BloodHound-MCP.py25.2 kB
from mcp.server.fastmcp import FastMCP from dotenv import load_dotenv from neo4j import GraphDatabase import os import logging # Configure logging logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger(__name__) # Load environment variables load_dotenv() # BloodHound & Neo4j connection details BLOODHOUND_URI = os.getenv("BLOODHOUND_URI", "bolt://localhost:7687") BLOODHOUND_USERNAME = os.getenv("BLOODHOUND_USERNAME", "neo4j") BLOODHOUND_PASSWORD = os.getenv("BLOODHOUND_PASSWORD", "bloodhound") logger.debug(f"Using Neo4j connection details:") logger.debug(f"URI: {BLOODHOUND_URI}") logger.debug(f"User: {BLOODHOUND_USERNAME}") # Create Neo4j driver with BloodHound CE specific settings driver = GraphDatabase.driver( BLOODHOUND_URI, auth=(BLOODHOUND_USERNAME, BLOODHOUND_PASSWORD), encrypted=False ) # Verify connection def verify_connectivity(): try: # Try both default and bloodhound databases databases = ["neo4j", "bloodhound"] for db in databases: try: with driver.session(database=db) as session: logger.debug(f"Attempting to verify connection to database '{db}'...") result = session.run("MATCH (n:User) RETURN count(n) as count") count = result.single()["count"] logger.info(f"Successfully connected to database '{db}'. Found {count} users.") return True except Exception as e: logger.debug(f"Failed to connect to database '{db}': {str(e)}") continue raise Exception("Could not connect to any database") except Exception as e: logger.error(f"Failed to connect to Neo4j: {str(e)}") return False # Create FastMCP server for BloodHound mcp = FastMCP("BH-Examples") @mcp.tool() async def query_bloodhound(query: str): databases = ["neo4j", "bloodhound"] last_error = None for db in databases: try: with driver.session(database=db) as session: result = session.run(query) data = [record.data() for record in result] logger.info(f"Query successful on database '{db}'") return {"success": True, "data": data} except Exception as e: last_error = e logger.debug(f"Query failed on database '{db}': {str(e)}") continue logger.error(f"Query failed on all databases. Last error: {str(last_error)}") return {"success": False, "error": str(last_error)} # Domain Information @mcp.tool() async def find_all_domain_admins(): query = """ MATCH p = (t:Group)<-[:MemberOf*1..]-(a) WHERE (a:User or a:Computer) and t.objectid ENDS WITH '-512' RETURN p LIMIT 1000 """ return await query_bloodhound(query) @mcp.tool() async def map_domain_trusts(): query = """ MATCH p = (:Domain)-[:TrustedBy]->(:Domain) RETURN p LIMIT 1000 """ return await query_bloodhound(query) @mcp.tool() async def find_tier_zero_locations(): query = """ MATCH p = (t:Base)<-[:Contains*1..]-(:Domain) WHERE t.highvalue = true RETURN p LIMIT 1000 """ return await query_bloodhound(query) @mcp.tool() async def map_ou_structure(): query = """ MATCH p = (:Domain)-[:Contains*1..]->(:OU) RETURN p LIMIT 1000 """ return await query_bloodhound(query) # Dangerous Privileges @mcp.tool() async def find_dcsync_privileges(): query = """ MATCH p=(:Base)-[:DCSync|AllExtendedRights|GenericAll]->(:Domain) RETURN p LIMIT 1000 """ return await query_bloodhound(query) @mcp.tool() async def find_foreign_group_memberships(): query = """ MATCH p=(s:Base)-[:MemberOf]->(t:Group) WHERE s.domainsid<>t.domainsid RETURN p LIMIT 1000 """ return await query_bloodhound(query) @mcp.tool() async def find_domain_users_local_admins(): query = """ MATCH p=(s:Group)-[:AdminTo]->(:Computer) WHERE s.objectid ENDS WITH '-513' RETURN p LIMIT 1000 """ return await query_bloodhound(query) @mcp.tool() async def find_domain_users_laps_readers(): query = """ MATCH p=(s:Group)-[:AllExtendedRights|ReadLAPSPassword]->(:Computer) WHERE s.objectid ENDS WITH '-513' RETURN p LIMIT 1000 """ return await query_bloodhound(query) @mcp.tool() async def find_domain_users_high_value_paths(): query = """ MATCH p=shortestPath((s:Group)-[r*1..]->(t)) WHERE t.highvalue = true AND s.objectid ENDS WITH '-513' AND s<>t RETURN p LIMIT 1000 """ return await query_bloodhound(query) @mcp.tool() async def find_domain_users_workstation_rdp(): query = """ MATCH p=(s:Group)-[:CanRDP]->(t:Computer) WHERE s.objectid ENDS WITH '-513' AND NOT toUpper(t.operatingsystem) CONTAINS 'SERVER' RETURN p LIMIT 1000 """ return await query_bloodhound(query) @mcp.tool() async def find_domain_users_server_rdp(): query = """ MATCH p=(s:Group)-[:CanRDP]->(t:Computer) WHERE s.objectid ENDS WITH '-513' AND toUpper(t.operatingsystem) CONTAINS 'SERVER' RETURN p LIMIT 1000 """ return await query_bloodhound(query) @mcp.tool() async def find_domain_users_privileges(): query = """ MATCH p=(s:Group)-[r]->(:Base) WHERE s.objectid ENDS WITH '-513' RETURN p LIMIT 1000 """ return await query_bloodhound(query) @mcp.tool() async def find_domain_admin_non_dc_logons(): query = """ MATCH (s)-[:MemberOf*0..]->(g:Group) WHERE g.objectid ENDS WITH '-516' WITH COLLECT(s) AS exclude MATCH p = (c:Computer)-[:HasSession]->(:User)-[:MemberOf*1..]->(g:Group) WHERE g.objectid ENDS WITH '-512' AND NOT c IN exclude RETURN p LIMIT 1000 """ return await query_bloodhound(query) # Kerberos Interaction @mcp.tool() async def find_kerberoastable_tier_zero(): query = """ MATCH (u:User) WHERE u.hasspn=true AND u.enabled = true AND NOT u.objectid ENDS WITH '-502' AND NOT u.gmsa = true AND NOT u.msa = true AND u.highvalue = true RETURN u LIMIT 100 """ return await query_bloodhound(query) @mcp.tool() async def find_all_kerberoastable_users(): query = """ MATCH (u:User) WHERE u.hasspn=true AND u.enabled = true AND NOT u.objectid ENDS WITH '-502' AND NOT u.gmsa = true AND NOT u.msa = true RETURN u LIMIT 100 """ return await query_bloodhound(query) @mcp.tool() async def find_kerberoastable_most_admin(): query = """ MATCH (u:User) WHERE u.hasspn = true AND u.enabled = true AND NOT u.objectid ENDS WITH '-502' AND NOT u.gmsa = true AND NOT u.msa = true MATCH (u)-[:MemberOf|AdminTo*1..]->(c:Computer) WITH DISTINCT u, COUNT(c) AS adminCount RETURN u ORDER BY adminCount DESC LIMIT 100 """ return await query_bloodhound(query) @mcp.tool() async def find_asreproast_users(): query = """ MATCH (u:User) WHERE u.dontreqpreauth = true AND u.enabled = true RETURN u LIMIT 100 """ return await query_bloodhound(query) # Shortest Paths @mcp.tool() async def find_shortest_paths_unconstrained_delegation(): query = """ MATCH p=shortestPath((s)-[r*1..]->(t:Computer)) WHERE t.unconstraineddelegation = true AND s<>t RETURN p LIMIT 1000 """ return await query_bloodhound(query) @mcp.tool() async def find_paths_from_kerberoastable_to_da(): query = """ MATCH p=shortestPath((s:User)-[r*1..]->(t:Group)) WHERE s.hasspn=true AND s.enabled = true AND NOT s.objectid ENDS WITH '-502' AND NOT s.gmsa = true AND NOT s.msa = true AND t.objectid ENDS WITH '-512' RETURN p LIMIT 1000 """ return await query_bloodhound(query) @mcp.tool() async def find_shortest_paths_to_tier_zero(): query = """ MATCH p=shortestPath((s)-[r*1..]->(t)) WHERE t.highvalue = true AND s<>t RETURN p LIMIT 1000 """ return await query_bloodhound(query) @mcp.tool() async def find_paths_from_domain_users_to_tier_zero(): query = """ MATCH p=shortestPath((s:Group)-[r*1..]->(t)) WHERE t.highvalue = true AND s.objectid ENDS WITH '-513' AND s<>t RETURN p LIMIT 1000 """ return await query_bloodhound(query) @mcp.tool() async def find_shortest_paths_to_domain_admins(): query = """ MATCH p=shortestPath((t:Group)<-[r*1..]-(s:Base)) WHERE t.objectid ENDS WITH '-512' AND s<>t RETURN p LIMIT 1000 """ return await query_bloodhound(query) @mcp.tool() async def find_paths_from_owned_objects(): query = """ MATCH p=shortestPath((s:Base)-[r*1..]->(t:Base)) WHERE s.owned = true AND s<>t RETURN p LIMIT 1000 """ return await query_bloodhound(query) # Active Directory Certificate Services @mcp.tool() async def find_pki_hierarchy(): query = """ MATCH p=()-[:HostsCAService|IssuedSignedBy|EnterpriseCAFor|RootCAFor|TrustedForNTAuth|NTAuthStoreFor*..]->(:Domain) RETURN p LIMIT 1000 """ return await query_bloodhound(query) @mcp.tool() async def find_public_key_services(): query = """ MATCH p = (c:Container)-[:Contains*..]->(:Base) WHERE c.distinguishedname starts with 'CN=PUBLIC KEY SERVICES,CN=SERVICES,CN=CONFIGURATION,DC=' RETURN p LIMIT 1000 """ return await query_bloodhound(query) @mcp.tool() async def find_certificate_enrollment_rights(): query = """ MATCH p = (:Base)-[:Enroll|GenericAll|AllExtendedRights]->(:CertTemplate)-[:PublishedTo]->(:EnterpriseCA) RETURN p LIMIT 1000 """ return await query_bloodhound(query) @mcp.tool() async def find_esc1_vulnerable_templates(): query = """ MATCH p = (:Base)-[:Enroll|GenericAll|AllExtendedRights]->(ct:CertTemplate)-[:PublishedTo]->(:EnterpriseCA) WHERE ct.enrolleesuppliessubject = True AND ct.authenticationenabled = True AND ct.requiresmanagerapproval = False AND (ct.authorizedsignatures = 0 OR ct.schemaversion = 1) RETURN p LIMIT 1000 """ return await query_bloodhound(query) @mcp.tool() async def find_esc2_vulnerable_templates(): query = """ MATCH p = (:Base)-[:Enroll|GenericAll|AllExtendedRights]->(c:CertTemplate)-[:PublishedTo]->(:EnterpriseCA) WHERE c.requiresmanagerapproval = false AND (c.effectiveekus = [''] OR '2.5.29.37.0' IN c.effectiveekus) AND (c.authorizedsignatures = 0 OR c.schemaversion = 1) RETURN p LIMIT 1000 """ return await query_bloodhound(query) @mcp.tool() async def find_enrollment_agent_templates(): query = """ MATCH p = (:Base)-[:Enroll|GenericAll|AllExtendedRights]->(ct:CertTemplate)-[:PublishedTo]->(:EnterpriseCA) WHERE '1.3.6.1.4.1.311.20.2.1' IN ct.effectiveekus OR '2.5.29.37.0' IN ct.effectiveekus OR SIZE(ct.effectiveekus) = 0 RETURN p LIMIT 1000 """ return await query_bloodhound(query) @mcp.tool() async def find_dcs_weak_certificate_binding(): query = """ MATCH p = (s:Computer)-[:DCFor]->(:Domain) WHERE s.strongcertificatebindingenforcementraw = 0 OR s.strongcertificatebindingenforcementraw = 1 RETURN p LIMIT 1000 """ return await query_bloodhound(query) @mcp.tool() async def find_inactive_tier_zero_principals(): query = """ WITH 60 as inactive_days MATCH (n:Base) WHERE n.highvalue = true AND n.enabled = true AND n.lastlogontimestamp < (datetime().epochseconds - (inactive_days * 86400)) AND n.lastlogon < (datetime().epochseconds - (inactive_days * 86400)) AND n.whencreated < (datetime().epochseconds - (inactive_days * 86400)) AND NOT n.name STARTS WITH 'AZUREADKERBEROS.' AND NOT n.objectid ENDS WITH '-500' AND NOT n.name STARTS WITH 'AZUREADSSOACC.' RETURN n """ return await query_bloodhound(query) @mcp.tool() async def find_tier_zero_without_smartcard(): query = """ MATCH (u:User) WHERE u.highvalue = true AND u.enabled = true AND u.smartcardrequired = false AND NOT u.name STARTS WITH 'MSOL_' AND NOT u.name STARTS WITH 'PROVAGENTGMSA' AND NOT u.name STARTS WITH 'ADSYNCMSA_' RETURN u """ return await query_bloodhound(query) @mcp.tool() async def find_domains_with_machine_quota(): query = """ MATCH (d:Domain) WHERE d.machineaccountquota > 0 RETURN d """ return await query_bloodhound(query) @mcp.tool() async def find_smartcard_dont_expire_domains(): query = """ MATCH (s:Domain)-[:Contains*1..]->(t:Base) WHERE s.expirepasswordsonsmartcardonlyaccounts = false AND t.enabled = true AND t.smartcardrequired = true RETURN s """ return await query_bloodhound(query) @mcp.tool() async def find_two_way_forest_trust_delegation(): query = """ MATCH p=(n:Domain)-[r:TrustedBy]->(m:Domain) WHERE (m)-[:TrustedBy]->(n) AND r.trusttype = 'Forest' AND r.tgtdelegationenabled = true RETURN p """ return await query_bloodhound(query) @mcp.tool() async def find_unsupported_operating_systems(): query = """ MATCH (c:Computer) WHERE c.operatingsystem =~ '(?i).*Windows.* (2000|2003|2008|2012|xp|vista|7|8|me|nt).*' RETURN c LIMIT 100 """ return await query_bloodhound(query) @mcp.tool() async def find_users_with_no_password_required(): query = """ MATCH (u:User) WHERE u.passwordnotreqd = true RETURN u LIMIT 100 """ return await query_bloodhound(query) @mcp.tool() async def find_users_password_not_rotated(): query = """ WITH 365 as days_since_change MATCH (u:User) WHERE u.pwdlastset < (datetime().epochseconds - (days_since_change * 86400)) AND NOT u.pwdlastset IN [-1.0, 0.0] RETURN u LIMIT 100 """ return await query_bloodhound(query) @mcp.tool() async def find_nested_tier_zero_groups(): query = """ MATCH p=(t:Group)<-[:MemberOf*..]-(s:Group) WHERE t.highvalue = true AND NOT s.objectid ENDS WITH '-512' AND NOT s.objectid ENDS WITH '-519' RETURN p LIMIT 1000 """ return await query_bloodhound(query) @mcp.tool() async def find_disabled_tier_zero_principals(): query = """ MATCH (n:Base) WHERE n.highvalue = true AND n.enabled = false AND NOT n.objectid ENDS WITH '-502' AND NOT n.objectid ENDS WITH '-500' RETURN n LIMIT 100 """ return await query_bloodhound(query) @mcp.tool() async def find_principals_reversible_encryption(): query = """ MATCH (n:Base) WHERE n.encryptedtextpwdallowed = true RETURN n """ return await query_bloodhound(query) @mcp.tool() async def find_principals_des_only_kerberos(): query = """ MATCH (n:Base) WHERE n.enabled = true AND n.usedeskeyonly = true RETURN n """ return await query_bloodhound(query) @mcp.tool() async def find_principals_weak_kerberos_encryption(): query = """ MATCH (u:Base) WHERE 'DES-CBC-CRC' IN u.supportedencryptiontypes OR 'DES-CBC-MD5' IN u.supportedencryptiontypes OR 'RC4-HMAC-MD5' IN u.supportedencryptiontypes RETURN u """ return await query_bloodhound(query) @mcp.tool() async def find_tier_zero_non_expiring_passwords(): query = """ MATCH (u:User) WHERE u.enabled = true AND u.pwdneverexpires = true AND u.highvalue = true RETURN u LIMIT 100 """ return await query_bloodhound(query) # NTLM Relay Attacks @mcp.tool() async def find_ntlm_relay_edges(): query = """ MATCH p = (n:Base)-[:CoerceAndRelayNTLMToLDAP|CoerceAndRelayNTLMToLDAPS|CoerceAndRelayNTLMToADCS|CoerceAndRelayNTLMToSMB]->(:Base) RETURN p LIMIT 500 """ return await query_bloodhound(query) @mcp.tool() async def find_esc8_vulnerable_cas(): query = """ MATCH (n:EnterpriseCA) WHERE n.hasvulnerableendpoint=true RETURN n """ return await query_bloodhound(query) @mcp.tool() async def find_computers_outbound_ntlm_deny(): query = """ MATCH (c:Computer) WHERE c.restrictoutboundntlm = True RETURN c LIMIT 1000 """ return await query_bloodhound(query) @mcp.tool() async def find_computers_in_protected_users(): query = """ MATCH p = (:Base)-[:MemberOf*1..]->(g:Group) WHERE g.objectid ENDS WITH "-525" RETURN p LIMIT 1000 """ return await query_bloodhound(query) @mcp.tool() async def find_dcs_vulnerable_ntlm_relay(): query = """ MATCH p = (dc:Computer)-[:DCFor]->(:Domain) WHERE (dc.ldapavailable = True AND dc.ldapsigning = False) OR (dc.ldapsavailable = True AND dc.ldapsepa = False) OR (dc.ldapavailable = True AND dc.ldapsavailable = True AND dc.ldapsigning = False and dc.ldapsepa = True) RETURN p """ return await query_bloodhound(query) @mcp.tool() async def find_computers_webclient_running(): query = """ MATCH (c:Computer) WHERE c.webclientrunning = True RETURN c LIMIT 1000 """ return await query_bloodhound(query) @mcp.tool() async def find_computers_no_smb_signing(): query = """ MATCH (n:Computer) WHERE n.smbsigning = False RETURN n """ return await query_bloodhound(query) # Azure - General @mcp.tool() async def find_global_administrators(): query = """ MATCH p = (:AZBase)-[:AZGlobalAdmin*1..]->(:AZTenant) RETURN p LIMIT 1000 """ return await query_bloodhound(query) @mcp.tool() async def find_high_privileged_role_members(): query = """ MATCH p=(t:AZRole)<-[:AZHasRole|AZMemberOf*1..2]-(:AZBase) WHERE t.name =~ '(?i)(Global Administrator|User Access Administrator|Privileged Role Administrator|Privileged Authentication Administrator|Partner Tier1 Support|Partner Tier2 Support)' RETURN p LIMIT 1000 """ return await query_bloodhound(query) # Azure - Shortest Paths @mcp.tool() async def find_paths_from_entra_to_tier_zero(): query = """ MATCH p=shortestPath((s:AZUser)-[r*1..]->(t:AZBase)) WHERE t.highvalue = true AND t.name =~ '(?i)(Global Administrator|User Access Administrator|Privileged Role Administrator|Privileged Authentication Administrator|Partner Tier1 Support|Partner Tier2 Support)' AND s<>t RETURN p LIMIT 1000 """ return await query_bloodhound(query) @mcp.tool() async def find_paths_to_privileged_roles(): query = """ MATCH p=shortestPath((s:AZBase)-[r*1..]->(t:AZRole)) WHERE t.name =~ '(?i)(Global Administrator|User Access Administrator|Privileged Role Administrator|Privileged Authentication Administrator|Partner Tier1 Support|Partner Tier2 Support)' AND s<>t RETURN p LIMIT 1000 """ return await query_bloodhound(query) @mcp.tool() async def find_paths_from_azure_apps_to_tier_zero(): query = """ MATCH p=shortestPath((s:AZApp)-[r*1..]->(t:AZBase)) WHERE t.highvalue = true AND s<>t RETURN p LIMIT 1000 """ return await query_bloodhound(query) @mcp.tool() async def find_paths_to_azure_subscriptions(): query = """ MATCH p=shortestPath((s:AZBase)-[r*1..]->(t:AZSubscription)) WHERE s<>t RETURN p LIMIT 1000 """ return await query_bloodhound(query) # Azure - Microsoft Graph @mcp.tool("sp_app_role_grant") async def find_service_principals_with_app_role_grant(): query = """ MATCH p=(:AZServicePrincipal)-[:AZMGGrantAppRoles]->(:AZTenant) RETURN p LIMIT 1000 """ return await query_bloodhound(query) @mcp.tool("find_sp_graph_assignments") async def find_service_principals_with_graph_assignments(): query = """ MATCH p=(:AZServicePrincipal)-[:AZMGAppRoleAssignment_ReadWrite_All|AZMGApplication_ReadWrite_All|AZMGDirectory_ReadWrite_All|AZMGGroupMember_ReadWrite_All|AZMGGroup_ReadWrite_All|AZMGRoleManagement_ReadWrite_Directory|AZMGServicePrincipalEndpoint_ReadWrite_All]->(:AZServicePrincipal) RETURN p LIMIT 1000 """ return await query_bloodhound(query) # Azure - Hygiene @mcp.tool() async def find_foreign_tier_zero_principals(): query = """ MATCH (n:AZServicePrincipal) WHERE n.highvalue = true AND NOT toUpper(n.appownerorganizationid) = toUpper(n.tenantid) AND n.appownerorganizationid CONTAINS '-' RETURN n LIMIT 100 """ return await query_bloodhound(query) @mcp.tool() async def find_synced_tier_zero_principals(): query = """ MATCH (ENTRA:AZBase) MATCH (AD:Base) WHERE ENTRA.onpremsyncenabled = true AND ENTRA.onpremid = AD.objectid AND AD.highvalue = true RETURN ENTRA LIMIT 100 """ return await query_bloodhound(query) @mcp.tool() async def find_external_tier_zero_users(): query = """ MATCH (n:AZUser) WHERE n.highvalue = true AND n.name CONTAINS '#EXT#@' RETURN n LIMIT 100 """ return await query_bloodhound(query) @mcp.tool() async def find_disabled_azure_tier_zero_principals(): query = """ MATCH (n:AZBase) WHERE n.highvalue = true AND n.enabled = false RETURN n LIMIT 100 """ return await query_bloodhound(query) @mcp.tool() async def find_devices_unsupported_os(): query = """ MATCH (n:AZDevice) WHERE n.operatingsystem CONTAINS 'WINDOWS' AND n.operatingsystemversion =~ '(10.0.19044|10.0.22000|10.0.19043|10.0.19042|10.0.19041|10.0.18363|10.0.18362|10.0.17763|10.0.17134|10.0.16299|10.0.15063|10.0.14393|10.0.10586|10.0.10240|6.3.9600|6.2.9200|6.1.7601|6.0.6200|5.1.2600|6.0.6003|5.2.3790|5.0.2195).?.*' RETURN n LIMIT 100 """ return await query_bloodhound(query) # Azure - Cross Platform Attack Paths @mcp.tool() async def find_entra_users_in_domain_admins(): query = """ MATCH p = (:AZUser)-[:SyncedToADUser]->(:User)-[:MemberOf]->(t:Group) WHERE t.objectid ENDS WITH '-512' RETURN p LIMIT 1000 """ return await query_bloodhound(query) @mcp.tool() async def find_onprem_users_owning_entra_objects(): query = """ MATCH p = (:User)-[:SyncedToEntraUser]->(:AZUser)-[:AZOwns]->(:AZBase) RETURN p LIMIT 1000 """ return await query_bloodhound(query) @mcp.tool() async def find_onprem_users_in_entra_groups(): query = """ MATCH p = (:User)-[:SyncedToEntraUser]->(:AZUser)-[:AZMemberOf]->(:AZGroup) RETURN p LIMIT 1000 """ return await query_bloodhound(query) @mcp.tool("templates_no_security_ext") async def find_templates_no_security_extension(): query = """ MATCH p = (:Base)-[:Enroll|GenericAll|AllExtendedRights]->(ct:CertTemplate)-[:PublishedTo]->(:EnterpriseCA) WHERE ct.nosecurityextension = true RETURN p LIMIT 1000 """ return await query_bloodhound(query) @mcp.tool("templates_with_user_san") async def find_templates_with_user_specified_san(): query = """ MATCH p = (:Base)-[:Enroll|GenericAll|AllExtendedRights]->(ct:CertTemplate)-[:PublishedTo]->(eca:EnterpriseCA) WHERE eca.isuserspecifiessanenabled = True RETURN p LIMIT 1000 """ return await query_bloodhound(query) @mcp.tool() async def find_ca_administrators(): query = """ MATCH p = (:Base)-[:ManageCertificates|ManageCA]->(:EnterpriseCA) RETURN p LIMIT 1000 """ return await query_bloodhound(query) @mcp.tool("onprem_users_direct_entra_roles") async def find_onprem_users_with_direct_entra_roles(): query = """ MATCH p = (:User)-[:SyncedToEntraUser]->(:AZUser)-[:AZHasRole]->(:AZRole) RETURN p LIMIT 1000 """ return await query_bloodhound(query) @mcp.tool("onprem_users_group_entra_roles") async def find_onprem_users_with_group_entra_roles(): query = """ MATCH p = (:User)-[:SyncedToEntraUser]->(:AZUser)-[:AZMemberOf]->(:AZGroup)-[:AZHasRole]->(:AZRole) RETURN p LIMIT 1000 """ return await query_bloodhound(query) @mcp.tool("onprem_users_direct_azure_roles") async def find_onprem_users_with_direct_azure_roles(): query = """ MATCH p = (:User)-[:SyncedToEntraUser]->(:AZUser)-[:AZOwner|AZUserAccessAdministrator|AZGetCertificates|AZGetKeys|AZGetSecrets|AZAvereContributor|AZKeyVaultContributor|AZContributor|AZVMAdminLogin|AZVMContributor|AZAKSContributor|AZAutomationContributor|AZLogicAppContributor|AZWebsiteContributor]->(:AZBase) RETURN p LIMIT 1000 """ return await query_bloodhound(query) @mcp.tool("onprem_users_group_azure_roles") async def find_onprem_users_with_group_azure_roles(): query = """ MATCH p = (:User)-[:SyncedToEntraUser]->(:AZUser)-[:AZMemberOf]->(:AZGroup)-[:AZOwner|AZUserAccessAdministrator|AZGetCertificates|AZGetKeys|AZGetSecrets|AZAvereContributor|AZKeyVaultContributor|AZContributor|AZVMAdminLogin|AZVMContributor|AZAKSContributor|AZAutomationContributor|AZLogicAppContributor|AZWebsiteContributor]->(:AZBase) RETURN p LIMIT 1000 """ return await query_bloodhound(query) if __name__ == "__main__": if verify_connectivity(): try: logger.info("Starting MCP server...") mcp.run(transport="stdio") finally: driver.close() else: logger.error("Failed to establish Neo4j connection. Please check your credentials and connection settings.")

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/MorDavid/BloodHound-MCP-AI'

If you have feedback or need assistance with the MCP directory API, please join our Discord server