#!/usr/bin/env python3
"""
MCP Server for Bloodhound Community Edition
This server provides an interface between an LLM and the Bloodhound CE data
"""
import argparse
import json
import logging
import os
from typing import Any, Dict, List, Optional
from dotenv import load_dotenv
# Import FastMCP
from mcp.server.fastmcp import FastMCP
# Import Bloodhound API client
from lib.bloodhound_api import BloodhoundAPI, BloodhoundAPIError, BloodhoundConnectionError
# Set up logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger(__name__)
# Load environment variables
load_dotenv()
# Initialize the MCP server and Bloodhound API client
mcp = FastMCP("bloodhound_mcp")
bloodhound_api = BloodhoundAPI()
# Create Resources for the LLM
@mcp.resource("bloodhound://cypher/examples")
def cypher_examples() -> str:
"""Provides example Cypher queries for common BloodHound operations"""
examples = """
BloodHound Cypher Query Examples
================================
Syntax Notes:
-------------
- BloodHound uses Neo4j as its graph database
- Nodes are represented with parentheses: (n:NodeType)
- Relationships are represented with square brackets: [r:RELATIONSHIP_TYPE]
- Patterns chain nodes and relationships: (n)-[r]->(m)
- The RETURN clause specifies what to output
Common Node Types:
-----------------
- AZTenant: Azure AD Tenant
- AZBase: Base Azure object type
- AZUser: Azure user
- AZGroup: Azure group
- AZGlobalAdmin: Global Administrator role
- AZApp: Azure application/service principal
- Computer: Active Directory computer
- User: Active Directory user
- Group: Active Directory group
Example Queries:
---------------
1. Find all Azure Global Admins:
MATCH p = (:AZBase)-[:AZGlobalAdmin*1..]->(:AZTenant)
RETURN p
2. Find Azure users that have administrative roles:
MATCH p=(u:AZUser)-[r:AZGlobalAdmin|AZPrivilegedRoleAdmin]->(t:AZTenant)
RETURN u.displayname, u.objectid, u.usertype, type(r) as role_type
3. Find Azure Service Principals with dangerous permissions:
MATCH p=(sp:AZServicePrincipal)-[r:AZApplicationAdministrator|AZCloudApplicationAdministrator]->(t:AZTenant)
RETURN sp.displayname, sp.objectid, type(r) as permission
4. Find Azure users with direct dangerous permissions:
MATCH p=(u:AZUser)-[r:AZGlobalAdmin|AZPrivilegedRoleAdmin|AZApplicationAdministrator]->(t:AZTenant)
RETURN u.displayname, u.objectid, type(r) as role
5. Identify potential attack paths to Global Admin:
MATCH p=shortestPath((n:AZUser {name:'targetuser@domain.com'})-[*1..]->(a:AZGlobalAdmin))
RETURN p
6. Find all Domain Admins in Active Directory:
MATCH p=(n)-[r:MemberOf*1..]->(g:Group {name:"DOMAIN ADMINS@DOMAIN.COM"})
RETURN p
7. Find Azure users who can reset passwords:
MATCH p=(u:AZUser)-[r:AZResetPassword]->(target:AZUser)
RETURN u.displayname, count(target) as can_reset_count
ORDER BY can_reset_count DESC
8. Find kerberoastable users:
MATCH (u:User)
WHERE u.hasspn=true
AND u.enabled = true
AND NOT u.objectid ENDS WITH '-502'
AND NOT COALESCE(u.gmsa, false) = true
AND NOT COALESCE(u.msa, false) = true
RETURN u
9. Find paths from owned users to Domain Admin:
MATCH p=shortestPath((s:Base)-[:Owns|GenericAll|GenericWrite|WriteOwner|WriteDacl|MemberOf|ForceChangePassword|AllExtendedRights|AddMember|HasSession|GPLink|AllowedToDelegate|CoerceToTGT|AllowedToAct|AdminTo|CanPSRemote|CanRDP|ExecuteDCOM|HasSIDHistory|AddSelf|DCSync|ReadLAPSPassword|ReadGMSAPassword|DumpSMSAPassword|SQLAdmin|AddAllowedToAct|WriteSPN|AddKeyCredentialLink|SyncLAPSPassword|WriteAccountRestrictions|WriteGPLink|GoldenCert|ADCSESC1|ADCSESC3|ADCSESC4|ADCSESC6a|ADCSESC6b|ADCSESC9a|ADCSESC9b|ADCSESC10a|ADCSESC10b|ADCSESC13|SyncedToEntraUser|CoerceAndRelayNTLMToSMB|CoerceAndRelayNTLMToADCS|WriteOwnerLimitedRights|OwnsLimitedRights|CoerceAndRelayNTLMToLDAP|CoerceAndRelayNTLMToLDAPS|Contains|DCFor|TrustedBy*1..]->(t:Base))
WHERE COALESCE(s.system_tags, '') CONTAINS 'owned' AND s<>t
RETURN p
10. Find Azure users with the most permissions:
MATCH (u:AZUser)
OPTIONAL MATCH (u)-[r]->(t)
WITH u, count(r) as num_permissions
RETURN u.displayname, num_permissions
ORDER BY num_permissions DESC
LIMIT 10
"""
return examples
@mcp.resource("bloodhound://cypher/patterns")
def cypher_patterns() -> str:
"""Provides common patterns for BloodHound Cypher queries"""
patterns = """
BloodHound Cypher Query Patterns
===============================
Finding Administrative Users:
---------------------------
Pattern: MATCH p=(base)-[relationship*1..]->(target)
For Azure environments:
- Base: :AZBase, :AZUser, :AZServicePrincipal
- Relationships: :AZGlobalAdmin, :AZPrivilegedRoleAdmin, :AZApplicationAdministrator
- Target: :AZTenant, :AZApp
For AD environments:
- Base: :User, :Computer, :Group
- Relationships: :MemberOf, :AdminTo, :GenericAll
- Target: :Group, :Computer, :Domain
Path Analysis:
-------------
Use shortestPath() for finding the most direct attack path:
MATCH p=shortestPath((start)-[*1..]->(end))
WHERE /* add conditions */
RETURN p
To find all possible paths within a certain length:
MATCH p=(start)-[*1..5]->(end)
WHERE /* add conditions */
RETURN p
Permission Analysis:
-------------------
To count permissions for an object:
MATCH (obj)-[r]->(target)
WITH obj, count(r) as permission_count
RETURN obj, permission_count
ORDER BY permission_count DESC
Relationship Types:
-----------------
Azure:
- AZGlobalAdmin: Global Admin role
- AZPrivilegedRoleAdmin: Privileged Role Admin
- AZResetPassword: Can reset passwords
- AZOwns: Owns the object
- AZExecuteCommand: Can execute commands
Active Directory:
- MemberOf: Group membership
- AdminTo: Administrative access
- GenericAll: Full control
- ForceChangePassword: Can force password change
- DCSync: Can perform DCSync
"""
return patterns
@mcp.resource("bloodhound://custom-nodes/opengraph-guide")
def custom_nodes_opengraph_guide() -> str:
"""Provides comprehensive guidance for working with BloodHound custom nodes and OpenGraph"""
guide = """
BloodHound Custom Nodes & OpenGraph Guide
=========================================
Overview:
--------
Custom nodes in BloodHound represent objects outside of standard Active Directory and Azure types.
They use the OpenGraph framework to extend BloodHound's graph model with custom entities and relationships.
Key Concepts:
------------
1. Custom Node Types: User-defined node categories (e.g., SQLServer, WebApp, NetworkDevice)
2. OpenGraph Schema: JSON structure defining nodes and edges for ingestion
3. Visual Configuration: Icons and colors for custom node display
4. Relationships: Directed edges connecting custom nodes to existing AD/Azure objects
Best Practices:
--------------
1. Universally Unique Identifiers:
- Every node must have a unique identifier
- Use GUIDs, SIDs, certificate thumbprints, or similar
- Avoid simple names that could collide
2. Node and Edge Design:
- Every edge must be directed and one-way
- Edge direction should match "access or attack" flow
- Create paths connecting non-adjacent nodes
- Think "map of one-way streets" for graph traversal
3. Design Philosophy:
- "Begin with the end in mind" - understand your attack path goals
- Ensure your model supports effective path discovery
- If not creating multi-node paths, consider using a relational database instead
Icon Configuration:
------------------
- Use Font Awesome free, solid icons only
- Specify icon name without 'fa-' prefix (e.g., "database" not "fa-database")
- Colors in #RGB or #RRGGBB format
- Example:
{
"icon": {
"type": "font-awesome",
"name": "database",
"color": "#FF0000"
}
}
OpenGraph Schema Structure:
--------------------------
Minimal JSON structure for data ingestion:
{
"graph": {
"nodes": [
{
"id": "unique-identifier",
"kinds": ["CustomNodeType", "Base"],
"properties": {
"name": "Object Name",
"custom_property": "value"
}
}
],
"edges": [
{
"kind": "CustomRelationship",
"start": {"value": "source-node-id"},
"end": {"value": "target-node-id"},
"properties": {
"weight": 1
}
}
]
}
}
Node Properties Rules:
---------------------
- Properties must be primitive types (strings, numbers, booleans)
- Arrays must be homogeneous (all elements same type)
- No nested objects allowed
- First 'kind' determines visual representation
Common Custom Node Examples:
---------------------------
1. Database Servers:
- SQLServer, MySQL, PostgreSQL nodes
- Relationships: AdminTo, ConnectsTo, ReadFrom
2. Network Infrastructure:
- Router, Switch, Firewall nodes
- Relationships: Routes, Filters, Connects
3. Applications:
- WebApp, Service, API nodes
- Relationships: Authenticates, Authorizes, Accesses
4. Security Appliances:
- SIEM, EDR, Proxy nodes
- Relationships: Monitors, Blocks, Logs
Integration with Existing BloodHound:
------------------------------------
- Custom nodes can have relationships to/from AD users, computers, groups
- Use standard BloodHound relationship types where applicable
- Create new relationship types for custom scenarios
- Maintain consistency with BloodHound's attack path methodology
Example: SQL Server Integration
------------------------------
1. Create SQLServer custom node type with database icon
2. Define relationships:
- User -[SQLAdmin]-> SQLServer
- SQLServer -[ConnectsTo]-> Database
- SQLServer -[RunsAs]-> ServiceAccount
3. This enables attack paths: User -> SQLServer -> ServiceAccount -> Domain
Validation and Testing:
----------------------
- Always validate icon configurations before creation
- Test data ingestion with small samples first
- Verify attack paths work as expected in BloodHound UI
- Check that custom nodes integrate properly with existing data
Common Pitfalls to Avoid:
------------------------
- Don't create isolated nodes without relationships
- Avoid complex nested properties (use separate nodes instead)
- Don't duplicate existing BloodHound functionality
- Ensure custom relationships have clear security implications
- Don't create one-to-one mappings (better suited for relational DBs)
MSSQLHound Example:
------------------
Reference implementation showing:
- Server-level and database-level objects
- Rich permission metadata
- Complex relationship hierarchies
- Practical attack path scenarios
Use MSSQLHound as a template for creating similar custom node implementations
for other technologies and environments.
"""
return guide
@mcp.resource("bloodhound://custom-nodes/examples")
def custom_nodes_examples() -> str:
"""Provides practical examples of custom node implementations"""
examples = """
BloodHound Custom Nodes Implementation Examples
==============================================
1. SQL Server Environment
-------------------------
Custom Node Types:
- MSSQL_Server: SQL Server instance
- MSSQL_Database: Individual database
- MSSQL_Login: Server-level login
- MSSQL_User: Database-level user
Icon Configuration:
{
"MSSQL_Server": {
"icon": {
"type": "font-awesome",
"name": "server",
"color": "#CC2936"
}
},
"MSSQL_Database": {
"icon": {
"type": "font-awesome",
"name": "database",
"color": "#4472C4"
}
}
}
Example Relationships:
- User -[SQLAdmin]-> MSSQL_Server
- MSSQL_Server -[Contains]-> MSSQL_Database
- MSSQL_Login -[CanAuth]-> MSSQL_Server
2. Web Application Stack
-----------------------
Custom Node Types:
- WebApp: Web application
- WebServer: Web server (IIS, Apache)
- AppPool: Application pool
- WebSite: IIS website
Icon Configuration:
{
"WebApp": {
"icon": {
"type": "font-awesome",
"name": "globe",
"color": "#00B04F"
}
},
"WebServer": {
"icon": {
"type": "font-awesome",
"name": "server",
"color": "#FF6600"
}
}
}
Example Relationships:
- WebApp -[RunsOn]-> WebServer
- WebServer -[RunsAs]-> ServiceAccount
- User -[WebAdmin]-> WebApp
3. Network Infrastructure
-------------------------
Custom Node Types:
- NetworkDevice: Routers, switches, firewalls
- VLAN: Virtual LAN segments
- NetworkSegment: Network subnets
Icon Configuration:
{
"NetworkDevice": {
"icon": {
"type": "font-awesome",
"name": "network-wired",
"color": "#8B4513"
}
},
"VLAN": {
"icon": {
"type": "font-awesome",
"name": "project-diagram",
"color": "#9932CC"
}
}
}
Example Relationships:
- NetworkDevice -[Routes]-> NetworkSegment
- Computer -[MemberOf]-> VLAN
- User -[NetworkAdmin]-> NetworkDevice
4. Cloud Resources (AWS/GCP)
----------------------------
Custom Node Types:
- CloudInstance: VM instances
- CloudRole: IAM roles
- CloudBucket: Storage buckets
- CloudFunction: Serverless functions
Icon Configuration:
{
"CloudInstance": {
"icon": {
"type": "font-awesome",
"name": "cloud",
"color": "#FF9900"
}
},
"CloudRole": {
"icon": {
"type": "font-awesome",
"name": "user-tag",
"color": "#232F3E"
}
}
}
Example OpenGraph Schema:
{
"graph": {
"nodes": [
{
"id": "i-1234567890abcdef0",
"kinds": ["CloudInstance", "Base"],
"properties": {
"name": "web-server-01",
"instance_type": "t3.medium",
"region": "us-east-1",
"status": "running"
}
}
],
"edges": [
{
"kind": "AssumeRole",
"start": {"value": "user-id"},
"end": {"value": "role-arn"}
}
]
}
}
5. Security Tools Integration
----------------------------
Custom Node Types:
- SIEM: Security information and event management
- EDR: Endpoint detection and response
- Vulnerability: Security vulnerabilities
- SecurityControl: Security controls/policies
Icon Configuration:
{
"SIEM": {
"icon": {
"type": "font-awesome",
"name": "shield-alt",
"color": "#DC143C"
}
},
"Vulnerability": {
"icon": {
"type": "font-awesome",
"name": "exclamation-triangle",
"color": "#FF4500"
}
}
}
Attack Path Examples:
- User -[HasVulnerability]-> Vulnerability -[AffectsService]-> Service
- Computer -[MonitoredBy]-> EDR -[ReportsTo]-> SIEM
Complete Example: Custom Node Creation
-------------------------------------
Step 1: Define custom types
custom_types = {
"SQLServer": {
"icon": {
"type": "font-awesome",
"name": "database",
"color": "#CC2936"
}
},
"WebApp": {
"icon": {
"type": "font-awesome",
"name": "globe",
"color": "#00B04F"
}
}
}
Step 2: Create nodes via API
POST /api/v2/custom-nodes
{
"custom_types": custom_types
}
Step 3: Prepare OpenGraph data
opengraph_data = {
"graph": {
"nodes": [
{
"id": "sql01.company.com",
"kinds": ["SQLServer", "Base"],
"properties": {
"name": "SQL01",
"version": "SQL Server 2019",
"instance": "MSSQLSERVER"
}
}
],
"edges": [
{
"kind": "SQLAdmin",
"start": {"value": "user-admin-id"},
"end": {"value": "sql01.company.com"}
}
]
}
}
Step 4: Ingest data through BloodHound's data ingestion API
This creates attack paths showing how users can compromise SQL servers
and potentially pivot to other systems in the environment.
"""
return examples
# Define prompts
@mcp.prompt()
def bloodhound_assistant() -> str:
return """You are an AI assistant that helps security professionals analyze Active Directory environments using BloodHound data.
You can provide and analyze information from an organization's Active Directory environment.
You have the capability to search for an object by name or Object ID, you just need to specify the object type you are searching for.
You can search for the following object types:
- For Active Directory: User, Computer, Group, GPO, OU, Domain
- For Azure: AZUser, AZGroup, AZDevice, etc.
It is recommended to perform a search when asked about a user first before trying to brute force or guess the user's username or Object ID.
You can also retrieve information on the domains within the BloodHound database.
You can analyze the domains within the BloodHound database.
Specifics on what information you can provide on and analyze for a domain include:
- Users
- Groups
- Computers
- Controllers
- Group Policy Objects (GPOs)
- Organizational Units (OUs)
- DC Syncers
- Foreign Admins
- Foreign GPO Controllers
- Foreign Groups
- Foreign Users
- Inbound Trusts
- Linked GPOs
- Outbound Trusts
You also have the ability to look further into individual users within a domain. You can analyze which users are prime targets and how they can possibly be exploited.
By combining all of the information below, you can provide an in-depth analysis of a user.
Information on the users includes:
- User's general information
- Administrative rights
- Constrained delegation rights
- Controllables
- Controllers
- DCOM rights
- Group memberships
- Remote PowerShell rights
- RDP rights
- Sessions
- SQL administrative rights
You have the capability to look further into the groups within a domain. You can analyze group memberships and how they can be exploited.
By combining all of the information below, you can provide an in-depth analysis of a group. Additionally, you can identify groups and their permissions to help determine attack paths.
Information on the groups includes:
- Group's general information
- Administrative rights
- Constrained delegation rights
- Controllables
- Controllers
- DCOM rights
- Group memberships
- Remote PowerShell rights
- RDP rights
- Sessions
- SQL administrative rights
You can also look into the computers within a domain. You can analyze the computer memberships and how they can be exploited.
By combining all of the information below, you can provide an in-depth analysis of a computer.
Information on the computers includes:
- Computer's general information
- Administrative rights (both the rights the computer has over other machines and the rights other security principals have over the computer)
- Constrained delegation rights(both the rights the computer has over other machines and the rights other security principals have over the computer)
- Controllables
- Controllers
- DCOM rights (both the rights the computer has over other machines and the rights other security principals have over the computer)
- Group memberships
- Remote PowerShell rights (both the rights the computer has over other machines and the rights other security principals have over the computer)
- RDP rights (both the rights the computer has over other machines and the rights other security principals have over the computer)
- Sessions
- SQL administrative rights
You also have visibility into the organizational units within a domain. By analyzing organizational units, you can identify the structure of a domain and how it can be exploited.
By combining all of the information below, you can provide an in-depth analysis of an organizational unit.
Information on organizational units includes:
- Organizational unit's general information
- Computers within the organizational unit
- Groups within the organizational unit
- Users within the organizational unit
- Security principals that have control over the organizational unit
- Tier-zero principals associated with the OU
- Users that the OU is applied to
Another capability you have is to look into the group policy objects within a domain. By analyzing group policy objects you can identify the structure of a domain and how it can be exploited.
By combining all of the information below, you can provide an in-depth analysis of a group policy object.
Information on the group policy objects includes:
- Group policy object's general information
- Computers that the Group Policy is applied to
- Security Principals that have control over the Group Policy
- Organizational Units that the Group Policy is applied to
- The tier-zero principals associated with the GPO
- The users that the GPOs are applied to
To assist further both in defensive and offensive security purposes, you have the capability to perform graph searches within the BloodHound database.
You can search for specific objects using graph with fuzzy searching.
You can also search for the shortest path between two objects in the BloodHound database.
You can also analyze certificate templates and certificate authorities within the domain.
These components play a critical role in the enterprise PKI infrastructure and can be abused for privilege escalation if misconfigured.
You can provide information on:
- Certificate Templates - These define the properties of certificates that can be issued
- Root Certificate Authorities (CAs) - The trusted root certificates in the domain
- Enterprise Certificate Authorities - CAs that issue certificates within the organization
- AIA Certificate Authorities - CAs that provide Authority Information Access
For each of these entities, you can analyze who controls them, which is critical for identifying potential ADCS-based attack vectors like ESC1 (Misconfigured Certificate Templates), ESC2 (Vulnerable Certificate Template Access Control), and other ADCS attacks.
You also have the capability to use Cypher queries to perform advanced searches and analysis within the BloodHound database.
When creating Cypher queries for BloodHound, remember:
1. BloodHound uses specific node labels for different object types:
- Active Directory: User, Computer, Group, Domain, OU, GPO
- Azure: AZUser, AZGroup, AZApp, AZServicePrincipal, AZTenant
- Custom Nodes: User-defined types like SQLServer, WebApp, NetworkDevice (via OpenGraph)
You have comprehensive capabilities for managing BloodHound data quality, custom nodes, and asset groups:
Data Quality Management:
- Monitor database completeness statistics
- Track data quality metrics over time for AD domains and Azure tenants
- Analyze platform-wide data quality trends
- Identify gaps in data collection coverage
Custom Nodes & OpenGraph:
- Create and manage custom node types for objects outside AD/Azure
- Configure visual display (icons and colors) for custom nodes
- Validate icon configurations before deployment
- Examples: SQL servers, web applications, network devices, cloud resources
- Integrate custom nodes with existing BloodHound attack path analysis
- Follow OpenGraph best practices for schema design and relationships
Asset Groups Management:
- Create and organize asset isolation groups for security analysis
- Define selectors to automatically group assets based on criteria
- Track asset group membership and changes over time
- Use both legacy asset groups and newer asset group tags APIs
- Monitor member counts and statistics by object type
These capabilities enable you to:
1. Extend BloodHound beyond standard AD/Azure to include custom infrastructure
2. Monitor and improve data collection quality
3. Organize assets for targeted security analysis and risk assessment
4. Create comprehensive attack path models including non-AD/Azure components
2. Relationship names are specific to BloodHound's data model:
- For Azure admins, use :AZGlobalAdmin, not :AZHasRole
- For group membership, use :MemberOf
- For paths, use *1.. to indicate "one or more" relationships
3. Always use pattern matching (p =) when searching for paths
4. Use shortestPath() to find the most direct attack paths
5. Include properties like .displayname, .objectid, or .name based on node type
If you need to understand the proper syntax for BloodHound Cypher queries, refer to the resources provided at:
- bloodhound://cypher/examples for specific query examples
- bloodhound://cypher/patterns for common query patterns
Remember that BloodHound Cypher queries are designed for attack path analysis and differ somewhat from standard Neo4j Cypher queries.
You can reference the already saved Cypher queries in the BloodHound database, and if one of the queries you run does not exist in BloodHound you can save it into the BloodHound server for future use.
You should name the query in a way that is descriptive of what the query does so that it can be easily referenced later.
# Azure Analysis Instructions
When responding to questions about Azure environments (including Azure AD, Entra ID, AzureAD, Microsoft Entra, or any Azure-related resources), you should ALWAYS prioritize using Cypher queries via the run_cypher_query tool instead of basic API endpoints.
For Azure environments, Cypher queries provide more comprehensive and flexible analysis capabilities.
Example Azure-related Cypher patterns:
- Finding Azure Global Admins: MATCH p = (:AZBase)-[:AZGlobalAdmin*1..]->(:AZTenant) RETURN p
- Finding Azure users with admin roles: MATCH p=(u:AZUser)-[r:AZGlobalAdmin|AZPrivilegedRoleAdmin]->(t:AZTenant) RETURN u.displayname, u.objectid, type(r) as role_type
- Finding attack paths to Global Admin: MATCH p=shortestPath((n:AZUser {name:'targetuser@domain.com'})-[*1..]->(a:AZGlobalAdmin)) RETURN p
- You can always fall back to the bloodhound://cypher/examples and bloodhound://cypher/patterns resources as references to construct queries
Always use AZ-prefixed node types (AZUser, AZGroup, AZServicePrincipal, AZApp, AZTenant, etc.) when analyzing Azure environments.
To get information, use the available tools to query the BloodHound database."""
# Define tools for the MCP server
# mcp tools for the /domains apis
@mcp.tool()
def get_domains():
try:
domains = bloodhound_api.domains.get_all()
return json.dumps(
{
"message": f"Found {len(domains)} domains in Bloodhound",
"domains": domains,
}
)
except Exception as e:
logger.error(f"Error retrieving domains: {e}")
return json.dumps({"error": f"Failed to retrieve domains: {str(e)}"})
@mcp.tool()
def search_objects(
query: str, object_type: str = None, limit: int = 100, skip: int = 0
):
"""
Search for objects in the BloodHound database by name or Object ID.
This is useful for finding specific objects when you don't know their exact ID.
Args:
query: Search text - can be a partial name, full name, or Object ID
object_type: Optional filter by object type:
- For Active Directory: User, Computer, Group, GPO, OU, Domain
- For Azure: AZUser, AZGroup, AZDevice, etc.
limit: Maximum number of results to return (default: 100)
skip: Number of results to skip for pagination (default: 0)
"""
try:
results = bloodhound_api.domains.search_objects(
query, object_type, limit=limit, skip=skip
)
return json.dumps(
{
"message": f"Found {results.get('count', 0)} results matching '{query}'",
"results": results.get("data", []),
"count": results.get("count", 0),
}
)
except Exception as e:
logger.error(f"Error searching for objects: {e}")
return json.dumps({"error": f"Failed to search for objects: {str(e)}"})
@mcp.tool()
def get_users(domain_id: str, limit: int = 100, skip: int = 0):
"""
Retrieves users from a specific domain in the Bloodhound database.
Args:
domain_id: The ID of the domain to query
limit: Maximum number of users to return (default: 100)
skip: Number of users to skip for pagination (default: 0)
"""
try:
users = bloodhound_api.domains.get_users(domain_id, limit=limit, skip=skip)
return json.dumps(
{
"message": f"Found {users.get('count', 0)} users in the domain",
"users": users.get("data", []),
"count": users.get("count", 0),
}
)
except Exception as e:
logger.error(f"Error retrieving users: {e}")
return json.dumps({"error": f"Failed to retrieve users: {str(e)}"})
@mcp.tool()
def get_groups(domain_id: str, limit: int = 100, skip: int = 0):
"""
Retrieves groups from a specific domain in the Bloodhound database.
Args:
domain_id: The ID of the domain to query
limit: Maximum number of groups to return (default: 100)
skip: Number of groups to skip for pagination (default: 0)
"""
try:
groups = bloodhound_api.domains.get_groups(domain_id, limit=limit, skip=skip)
return json.dumps(
{
"message": f"Found {groups.get('count', 0)} groups in the domain",
"groups": groups.get("data", []),
"count": groups.get("count", 0),
}
)
except Exception as e:
logger.error(f"Error retrieving groups: {e}")
return json.dumps({"error": f"Failed to retrieve groups: {str(e)}"})
@mcp.tool()
def get_computers(domain_id: str, limit: int = 100, skip: int = 0):
"""
Retrieves computers from a specific domain in the Bloodhound database.
Args:
domain_id: The ID of the domain to query
limit: Maximum number of computers to return (default: 100)
skip: Number of computers to skip for pagination (default: 0)
"""
try:
computers = bloodhound_api.domains.get_computers(
domain_id, limit=limit, skip=skip
)
return json.dumps(
{
"message": f"Found {computers.get('count', 0)} computers in the domain",
"computers": computers.get("data", []),
"count": computers.get("count", 0),
}
)
except Exception as e:
logger.error(f"Error retrieving computers: {e}")
return json.dumps({"error": f"Failed to retrieve computers: {str(e)}"})
@mcp.tool()
def get_security_controllers(domain_id: str, limit: int = 100, skip: int = 0):
"""
Retrieves security principals that have control relationships over other objects in the domain.
In Bloodhound terminology, a "controller" is any security principal (user, group, computer)
that has some form of control relationship (like AdminTo, WriteOwner, GenericAll, etc.)
over another security object in the domain. These are NOT domain controllers (AD servers),
but rather represent control edges in the graph.
These control relationships are key for identifying potential attack paths in the domain.
Example controllers might include:
- A user with AdminTo rights on a computer
- A group with GenericAll rights over another group
- A user with WriteOwner rights over another user
Args:
domain_id: The ID of the domain to query
limit: Maximum number of control relationships to return (default: 100)
skip: Number of control relationships to skip for pagination (default: 0)
"""
try:
controllers = bloodhound_api.domains.get_controllers(
domain_id, limit=limit, skip=skip
)
return json.dumps(
{
"message": f"Found {controllers.get('count', 0)} controllers",
"controllers": controllers.get("data", []),
"count": controllers.get("count", 0),
}
)
except Exception as e:
logger.error(f"Error retrieving controllers: {e}")
return json.dumps({"error": f"Failed to retrieve controllers: {str(e)}"})
@mcp.tool()
def get_gpos(domain_id: str, limit: int = 100, skip: int = 0):
"""
Retrieves Group Policy Objects (GPOs) from a specific domain in the Bloodhound database.
GPOs are containers for policy settings that can be applied to users and computers in Active Directory.
These can be abused for persistence and privilege escalation and are key in idenitfying GPO related edges.
Args:
domain_id: The ID of the domain to query
limit: Maximum number of GPOs to return (default: 100)
skip: Number of GPOs to skip for pagination (default: 0)
"""
try:
gpos = bloodhound_api.domains.get_gpos(domain_id, limit=limit, skip=skip)
return json.dumps(
{
"message": f"Found {gpos.get('count', 0)} GPOs in the domain",
"gpos": gpos.get("data", []),
"count": gpos.get("count", 0),
}
)
except Exception as e:
logger.error(f"Error retrieving GPOs: {e}")
return json.dumps({"error": f"Failed to retrieve GPOs: {str(e)}"})
@mcp.tool()
def get_ous(domain_id: str, limit: int = 100, skip: int = 0):
"""
Retrieves Organizational Units (OUs) from a specific domain in the Bloodhound database.
OUs are containers within a domain that can hold users, groups, computers, and other OUs.
These are key in understanding the structure of the domain.
Args:
domain_id: The ID of the domain to query
limit: Maximum number of OUs to return (default: 100)
skip: Number of OUs to skip for pagination (default
"""
try:
ous = bloodhound_api.domains.get_ous(domain_id, limit=limit, skip=skip)
return json.dumps(
{
"message": f"Found {ous.get('count', 0)} OUs in the domain",
"ous": ous.get("data", []),
"count": ous.get("count", 0),
}
)
except Exception as e:
logger.error(f"Error retrieving OUs: {e}")
return json.dumps({"error": f"Failed to retrieve OUs: {str(e)}"})
@mcp.tool()
def get_dc_syncers(domain_id: str, limit: int = 100, skip: int = 0):
"""
Retrieves security principals (users, groups, computers ) that are given the "GetChanges" and "GetChangesAll" permissions on the domain.
The security principals are therefore able to perform a DCSync attack.
They are are great targets for lateral movement or privilege escalation or domain compromise.
Args:
domain_id: The ID of the domain to query
limit: Maximum number of DC Syncers to return (default: 100)
skip: Number of DC Syncers to skip for pagination (default: 0)
"""
try:
dc_syncers = bloodhound_api.domains.get_dc_syncers(
domain_id, limit=limit, skip=skip
)
return json.dumps(
{
"message": f"Found {dc_syncers.get('count', 0)} DC Syncers in the domain",
"dc_syncers": dc_syncers.get("data", []),
"count": dc_syncers.get("count", 0),
}
)
except Exception as e:
logger.error(f"Error retrieving DC Syncers: {e}")
return json.dumps({"error": f"Failed to retrieve DC Syncers: {str(e)}"})
@mcp.tool()
def get_foreign_admins(domain_id: str, limit: int = 100, skip: int = 0):
"""
Retrieves foreign admins from a specific domain in the Bloodhound database.
"Foreign Admins" are defined as security principals (users, groups, or computers) from one domain that have administrative privileges in another domain within the same forest.
These are potential targets for lateral movement and privilege escalation as well as cross domain compromise.
Args:
domain_id: The ID of the domain to query
limit: Maximum number of foreign admins to return (default: 100)
skip: Number of foreign admins to skip for pagination (default: 0)
"""
try:
foreign_admins = bloodhound_api.domains.get_foreign_admins(
domain_id, limit=limit, skip=skip
)
return json.dumps(
{
"message": f"Found {foreign_admins.get('count', 0)} foreign admins in the domain",
"foreign_admins": foreign_admins.get("data", []),
"count": foreign_admins.get("count", 0),
}
)
except Exception as e:
logger.error(f"Error retrieving foreign admins: {e}")
return json.dumps({"error": f"Failed to retrieve foreign admins: {str(e)}"})
@mcp.tool()
def get_foreign_gpo_controllers(domain_id: str, limit: int = 100, skip: int = 0):
"""
Retrieves foreign GPO controllers from a specific domain in the Bloodhound database.
"Foreign GPO Controllers" are defined as security principals (users, groups, or computers) from one domain that have the ability to modify or control Group Policy Objects (GPOs) in another domain within the same forest
These are potential targets for lateral movement and privilege escalation as well as cross domain compromise.
Args:
domain_id: The ID of the domain to query
limit: Maximum number of foreign GPO controllers to return (default: 100)
skip: Number of foreign GPO controllers to skip for pagination (default: 0)
"""
try:
foreign_gpo_controllers = bloodhound_api.domains.get_foreign_gpo_controllers(
domain_id, limit=limit, skip=skip
)
return json.dumps(
{
"message": f"Found {foreign_gpo_controllers.get('count', 0)} foreign GPO controllers in the domain",
"foreign_gpo_controllers": foreign_gpo_controllers.get("data", []),
"count": foreign_gpo_controllers.get("count", 0),
}
)
except Exception as e:
logger.error(f"Error retrieving foreign GPO controllers: {e}")
return json.dumps(
{"error": f"Failed to retrieve foreign GPO controllers: {str(e)}"}
)
@mcp.tool()
def get_foreign_groups(domain_id: str, limit: int = 100, skip: int = 0):
"""
Retrieves foreign groups from a specific domain in the Bloodhound database.
"Foreign Groups" are defined as security groups from one domain that have members from another domain within the same forest. They represent cross-domain group memberships in Active Directory.
These are potential targets for lateral movement and privilege escalation as well as cross domain compromise.
Args:
domain_id: The ID of the domain to query
limit: Maximum number of foreign groups to return (default: 100)
skip: Number of foreign groups to skip for pagination (default: 0)
"""
try:
foreign_groups = bloodhound_api.domains.get_foreign_groups(
domain_id, limit=limit, skip=skip
)
return json.dumps(
{
"message": f"Found {foreign_groups.get('count', 0)} foreign groups in the domain",
"foreign_groups": foreign_groups.get("data", []),
"count": foreign_groups.get("count", 0),
}
)
except Exception as e:
logger.error(f"Error retrieving foreign groups: {e}")
return json.dumps({"error": f"Failed to retrieve foreign groups: {str(e)}"})
@mcp.tool()
def get_foreign_users(domain_id: str, limit: int = 100, skip: int = 0):
"""
Retrieves foreign users from a specific domain in the Bloodhound database.
"Foreign Users" are defined as user accounts from one domain that are referenced in another domain within the same forest. These represent user accounts that have some form of relationship or access across domain boundaries.
These are potential targets for lateral movement and privilege escalation as well as cross domain compromise.
Args:
domain_id: The ID of the domain to query
limit: Maximum number of foreign users to return (default: 100)
skip: Number of foreign users to skip for pagination (default: 0)
"""
try:
foreign_users = bloodhound_api.domains.get_foreign_users(
domain_id, limit=limit, skip=skip
)
return json.dumps(
{
"message": f"Found {foreign_users.get('count', 0)} foreign users in the domain",
"foreign_users": foreign_users.get("data", []),
"count": foreign_users.get("count", 0),
}
)
except Exception as e:
logger.error(f"Error retrieving foreign users: {e}")
return json.dumps({"error": f"Failed to retrieve foreign users: {str(e)}"})
@mcp.tool()
def get_inbound_trusts(domain_id: str, limit: int = 100, skip: int = 0):
"""
Retrieves inbound trusts from a specific domain in the Bloodhound database.
"Inbound Trusts" are defined as trust relationships where the domain is the trusted domain and other domains trust it.
These are potential targets for moving to other external domains or other domains within the forest
Args:
domain_id: The ID of the domain to query
limit: Maximum number of inbound trusts to return (default: 100)
skip: Number of inbound trusts to skip for pagination (default: 0)
"""
try:
inbound_trusts = bloodhound_api.domains.get_inbound_trusts(
domain_id, limit=limit, skip=skip
)
return json.dumps(
{
"message": f"Found {inbound_trusts.get('count', 0)} inbound trusts in the domain",
"inbound_trusts": inbound_trusts.get("data", []),
"count": inbound_trusts.get("count", 0),
}
)
except Exception as e:
logger.error(f"Error retrieving inbound trusts: {e}")
return json.dumps({"error": f"Failed to retrieve inbound trusts: {str(e)}"})
@mcp.tool()
def get_linked_gpos(domain_id: str, limit: int = 100, skip: int = 0):
"""
Retrieves linked GPOs from a specific domain in the Bloodhound database.
"Linked GPOs" are defined as Group Policy Objects that have been linked to or associated with specific Active Directory containers such as domains, organizational units (OUs), or sites
These are potential targets for moving laterally, elevating privileges, or maintaining persistence in the domain.
Args:
domain_id: The ID of the domain to query
limit: Maximum number of linked GPOs to return (default: 100)
skip: Number of linked GPOs to skip for pagination (default: 0)
"""
try:
linked_gpos = bloodhound_api.domains.get_linked_gpos(
domain_id, limit=limit, skip=skip
)
return json.dumps(
{
"message": f"Found {linked_gpos.get('count', 0)} linked GPOs in the domain",
"linked_gpos": linked_gpos.get("data", []),
"count": linked_gpos.get("count", 0),
}
)
except Exception as e:
logger.error(f"Error retrieving linked GPOs: {e}")
return json.dumps({"error": f"Failed to retrieve linked GPOs: {str(e)}"})
@mcp.tool()
def get_outbound_trusts(domain_id: str, limit: int = 100, skip: int = 0):
"""
Retrieves outbound trusts from a specific domain in the Bloodhound database.
"Outbound Trusts" are defined as trust relationships where the domain trusts other domains.
These are potential targets for accessing resources within another domain and may provide a path into the domain if the external one has weaker security.
Args:
domain_id: The ID of the domain to query
limit: Maximum number of outbound trusts to return (default: 100)
skip: Number of outbound trusts to skip for pagination (default: 0)
"""
try:
outbound_trusts = bloodhound_api.domains.get_outbound_trusts(
domain_id, limit=limit, skip=skip
)
return json.dumps(
{
"message": f"Found {outbound_trusts.get('count', 0)} outbound trusts in the domain",
"outbound_trusts": outbound_trusts.get("data", []),
"count": outbound_trusts.get("count", 0),
}
)
except Exception as e:
logger.error(f"Error retrieving outbound trusts: {e}")
return json.dumps({"error": f"Failed to retrieve outbound trusts: {str(e)}"})
# mcp tools for the /users apis
@mcp.tool()
def get_user_info(user_id: str):
"""
Retrieves information about a specific user in a specific domain.
This provides a general overview of a user's information including their name, domain, and other attributes.
It can be used to conduct reconnaissance and start formulating and targeting users within the domain
Args:
user_id: The ID of the user to query
"""
try:
user_info = bloodhound_api.users.get_info(user_id)
return json.dumps(
{
"message": f"User information for {user_info.get('name')}",
"user_info": user_info,
}
)
except Exception as e:
logger.error(f"Error retrieving user information: {e}")
return json.dumps({"error": f"Failed to retrieve user information: {str(e)}"})
@mcp.tool()
def get_user_admin_rights(user_id: str, limit: int = 100, skip: int = 0):
"""
Retrieves the administrative rights of a specific user in the domain.
Administrative rights are privileges that allow a user to perform administrative tasks on a Security Principal (user, group, or computer) in Active Directory.
These rights can be abused in a variety of ways include lateral movement, persistence, and privilege escalation.
Args:
user_id: The ID of the user to query
limit: Maximum number of administrative rights to return (default: 100)
skip: Number of administrative rights to skip for pagination (default: 0)
"""
try:
user_admin_rights = bloodhound_api.users.get_admin_rights(
user_id, limit=limit, skip=skip
)
return json.dumps(
{
"message": f"Found {user_admin_rights.get('count', 0)} administrative rights for the user",
"user_admin_rights": user_admin_rights.get("data", []),
"count": user_admin_rights.get("count", 0),
}
)
except Exception as e:
logger.error(f"Error retrieving user administrative rights: {e}")
return json.dumps(
{"error": f"Failed to retrieve user administrative rights: {str(e)}"}
)
@mcp.tool()
def get_user_constrained_delegation_rights(
user_id: str, limit: int = 100, skip: int = 0
):
"""
Retrieves the constrained delegation rights of a specific user within the domain.
Constrained delegation rights allow a user to impersonate another user or service when communicating with a service on another computer.
These rights can be abused for privilege escalation and lateral movement within the domain.
Args:
user_id: The ID of the user to query
limit: Maximum number of constrained delegation rights to return (default: 100)
skip: Number of constrained delegation rights to skip for pagination (default: 0)
"""
try:
user_constrained_delegation_rights = (
bloodhound_api.users.get_constrained_delegation_rights(
user_id, limit=limit, skip=skip
)
)
return json.dumps(
{
"message": f"Found {user_constrained_delegation_rights.get('count', 0)} constrained delegation rights for the user",
"user_constrained_delegation_rights": user_constrained_delegation_rights.get(
"data", []
),
"count": user_constrained_delegation_rights.get("count", 0),
}
)
except Exception as e:
logger.error(f"Error retrieving user constrained delegation rights: {e}")
return json.dumps(
{
"error": f"Failed to retrieve user constrained delegation rights: {str(e)}"
}
)
@mcp.tool()
def get_user_controllables(user_id: str, limit: int = 100, skip: int = 0):
"""
Retrieves the Security Princiapls within the domain that a specific user has administrative control over in the domain.
These are entities that the user can control and manipulate within the domain.
These are potential targets for lateral movement, privilege escalation, and persistence.
Args:
user_id: The ID of the user to query
limit: Maximum number of controllables to return (default: 100)
skip: Number of controllables to skip for pagination (default: 0)
"""
try:
user_controlables = bloodhound_api.users.get_controllables(
user_id, limit=limit, skip=skip
)
return json.dumps(
{
"message": f"Found {user_controlables.get('count', 0)} controlables for the user",
"user_controlables": user_controlables.get("data", []),
"count": user_controlables.get("count", 0),
}
)
except Exception as e:
logger.error(f"Error retrieving user controlables: {e}")
return json.dumps({"error": f"Failed to retrieve user controlables: {str(e)}"})
@mcp.tool()
def get_user_controllers(user_id: str, limit: int = 100, skip: int = 0):
"""
Retrieves the controllers of a specific user in the domain.
Controllers are entities that have control over the specified user
This can be used to help identify paths to gain access to a specific user.
Args:
user_id: The ID of the user to query
limit: Maximum number of controllers to return (default: 100)
skip: Number of controllers to skip for pagination (default: 0)
"""
try:
user_controllers = bloodhound_api.users.get_controllers(
user_id, limit=limit, skip=skip
)
return json.dumps(
{
"message": f"Found {user_controllers.get('count', 0)} controllers for the user",
"user_controllers": user_controllers.get("data", []),
"count": user_controllers.get("count", 0),
}
)
except Exception as e:
logger.error(f"Error retrieving user controllers: {e}")
return json.dumps({"error": f"Failed to retrieve user controllers: {str(e)}"})
@mcp.tool()
def get_user_dcom_rights(user_id: str, limit: int = 100, skip: int = 0):
"""
Retrieves the DCOM rights of a specific user within the domain.
DCOM rights allow a user to communicate with COM objects on another computer in the network.
These rights can be abused for privilege escalation and lateral movement within the domain.
Args:
user_id: The ID of the user to query
limit: Maximum number of DCOM rights to return (default: 100)
skip: Number of DCOM rights to skip for pagination (default: 0)
"""
try:
user_dcom_rights = bloodhound_api.users.get_dcom_rights(
user_id, limit=limit, skip=skip
)
return json.dumps(
{
"message": f"Found {user_dcom_rights.get('count', 0)} DCOM rights for the user",
"user_dcom_rights": user_dcom_rights.get("data", []),
"count": user_dcom_rights.get("count", 0),
}
)
except Exception as e:
logger.error(f"Error retrieving user DCOM rights: {e}")
return json.dumps({"error": f"Failed to retrieve user DCOM rights: {str(e)}"})
@mcp.tool()
def get_user_memberships(user_id: str, limit: int = 100, skip: int = 0):
"""
Retrieves the group memberships of a specific user within the domain.
Group memberships are the groups that a user is a member of within the domain.
These memberships can be used to identify potential targets for lateral movement and privilege escalation.
Args:
user_id: The ID of the user to query
limit: Maximum number of memberships to return (default: 100)
skip: Number of memberships to skip for pagination (default: 0)
"""
try:
user_memberships = bloodhound_api.users.get_memberships(
user_id, limit=limit, skip=skip
)
return json.dumps(
{
"message": f"Found {user_memberships.get('count', 0)} memberships for the user",
"user_memberships": user_memberships.get("data", []),
"count": user_memberships.get("count", 0),
}
)
except Exception as e:
logger.error(f"Error retrieving user memberships: {e}")
return json.dumps({"error": f"Failed to retrieve user memberships: {str(e)}"})
@mcp.tool()
def get_user_ps_remote_rights(user_id: str, limit: int = 100, skip: int = 0):
"""
Retrieves the remote PowerShell rights of a specific user within the domain.
Remote PowerShell rights allow a user to execute PowerShell commands on a remote computer.
These rights can be abused for lateral movement and privilege escalation within the domain.
Args:
user_id: The ID of the user to query
limit: Maximum number of remote PowerShell rights to return (default: 100)
skip: Number of remote PowerShell rights to skip for pagination
"""
try:
user_ps_remote_rights = bloodhound_api.users.get_ps_remote_rights(
user_id, limit=limit, skip=skip
)
return json.dumps(
{
"message": f"Found {user_ps_remote_rights.get('count', 0)} remote PowerShell rights for the user",
"user_ps_remote_rights": user_ps_remote_rights.get("data", []),
"count": user_ps_remote_rights.get("count", 0),
}
)
except Exception as e:
logger.error(f"Error retrieving user remote PowerShell rights: {e}")
return json.dumps(
{"error": f"Failed to retrieve user remote PowerShell rights: {str(e)}"}
)
@mcp.tool()
def get_user_rdp_rights(user_id: str, limit: int = 100, skip: int = 0):
"""
Retrieves the RDP rights of a specific user within the domain.
RDP rights allow a user to remotely connect to another computer using the Remote Desktop Protocol.
These rights can be abused for lateral movement and privilege escalation within the domain.
Args:
user_id: The ID of the user to query
limit: Maximum number of RDP rights to return (default: 100)
skip: Number of RDP rights to skip for pagination (default: 0)
"""
try:
user_rdp_rights = bloodhound_api.users.get_rdp_rights(
user_id, limit=limit, skip=skip
)
return json.dumps(
{
"message": f"Found {user_rdp_rights.get('count', 0)} RDP rights for the user",
"user_rdp_rights": user_rdp_rights.get("data", []),
"count": user_rdp_rights.get("count", 0),
}
)
except Exception as e:
logger.error(f"Error retrieving user RDP rights: {e}")
return json.dumps({"error": f"Failed to retrieve user RDP rights: {str(e)}"})
@mcp.tool()
def get_user_sessions(user_id: str, limit: int = 100, skip: int = 0):
"""
Retrieves the active sessions of a specific user within the domain.
Active sessions are the current sessions that a user has within the domain.
These sessions can be used to identify potential targets for lateral movement and privilege escalation.
It can also be used to indentify and plan attack paths within the domain.
Args:
user_id: The ID of the user to query
limit: Maximum number of sessions to return (default: 100)
skip: Number of sessions to skip for pagination (default: 0)
"""
try:
user_sessions = bloodhound_api.users.get_sessions(
user_id, limit=limit, skip=skip
)
return json.dumps(
{
"message": f"Found {user_sessions.get('count', 0)} sessions for the user",
"user_sessions": user_sessions.get("data", []),
"count": user_sessions.get("count", 0),
}
)
except Exception as e:
logger.error(f"Error retrieving user sessions: {e}")
return json.dumps({"error": f"Failed to retrieve user sessions: {str(e)}"})
@mcp.tool()
def get_user_sql_admin_rights(user_id: str, limit: int = 100, skip: int = 0):
"""
Retrieves the SQL administrative rights of a specific user within the domain.
SQL administrative rights allow a user to perform administrative tasks on a SQL Server.
These rights can be abused for lateral movement and privilege escalation within the domain.
Args:
user_id: The ID of the user to query
limit: Maximum number of SQL administrative rights to return (default: 100)
skip: Number of SQL administrative rights to skip for pagination (default: 0)
"""
try:
user_sql_admin_rights = bloodhound_api.users.get_sql_admin_rights(
user_id, limit=limit, skip=skip
)
return json.dumps(
{
"message": f"Found {user_sql_admin_rights.get('count', 0)} SQL administrative rights for the user",
"user_sql_admin_rights": user_sql_admin_rights.get("data", []),
"count": user_sql_admin_rights.get("count", 0),
}
)
except Exception as e:
logger.error(f"Error retrieving user SQL administrative rights: {e}")
return json.dumps(
{"error": f"Failed to retrieve user SQL administrative rights: {str(e)}"}
)
# mcp tools for the /groups apis
@mcp.tool()
def get_group_info(group_id: str):
"""
Retrieves information about a specific group in a specific domain.
This provides a general overview of a group's information including their name, domain, and other attributes.
It can be used to conduct reconnaissance and start formulating and targeting groups within the domain
Args:
group_id: The ID of the group to query
"""
try:
group_info = bloodhound_api.groups.get_info(group_id)
return json.dumps(
{
"message": f"Group information for {group_info.get('name')}",
"group_info": group_info,
}
)
except Exception as e:
logger.error(f"Error retrieving group information: {e}")
return json.dumps({"error": f"Failed to retrieve group information: {str(e)}"})
@mcp.tool()
def get_group_admin_rights(group_id: str, limit: int = 100, skip: int = 0):
"""
Retrieves the administrative rights of a specific group in the domain.
Administrative rights are privileges that allow a group to perform administrative tasks on a Security Principal (user, group, or computer) in Active Directory.
These rights can be abused in a variety of ways include lateral movement, persistence, and privilege escalation.
Args:
group_id: The ID of the group to query
limit: Maximum number of administrative rights to return (default: 100)
skip: Number of administrative rights to skip for pagination (default: 0)
"""
try:
group_admin_rights = bloodhound_api.groups.get_admin_rights(
group_id, limit=limit, skip=skip
)
return json.dumps(
{
"message": f"Found {group_admin_rights.get('count', 0)} administrative rights for the group",
"group_admin_rights": group_admin_rights.get("data", []),
"count": group_admin_rights.get("count", 0),
}
)
except Exception as e:
logger.error(f"Error retrieving group administrative rights: {e}")
return json.dumps(
{"error": f"Failed to retrieve group administrative rights: {str(e)}"}
)
@mcp.tool()
def get_group_controllables(group_id: str, limit: int = 100, skip: int = 0):
"""
Retrieves the Security Princiapls within the domain that a specific group has administrative control over in the domain.
These are entities that the group can control and manipulate within the domain.
These are potential targets for lateral movement, privilege escalation, and persistence.
Args:
group_id: The ID of the group to query
limit: Maximum number of controllables to return (default: 100)
skip: Number of controllables to skip for pagination (default: 0)
"""
try:
group_controlables = bloodhound_api.groups.get_controllables(
group_id, limit=limit, skip=skip
)
return json.dumps(
{
"message": f"Found {group_controlables.get('count', 0)} controlables for the group",
"group_controlables": group_controlables.get("data", []),
"count": group_controlables.get("count", 0),
}
)
except Exception as e:
logger.error(f"Error retrieving group controlables: {e}")
return json.dumps({"error": f"Failed to retrieve group controlables: {str(e)}"})
@mcp.tool()
def get_group_controllers(group_id: str, limit: int = 100, skip: int = 0):
"""
Retrieves the controllers of a specific group in the domain.
Controllers are entities that have control over the specified group
This can be used to help identify paths to gain access to a specific group.
Args:
group_id: The ID of the group to query
limit: Maximum number of controllers to return (default: 100)
skip: Number of controllers to skip for pagination (default: 0)
"""
try:
group_controllers = bloodhound_api.groups.get_controllers(
group_id, limit=limit, skip=skip
)
return json.dumps(
{
"message": f"Found {group_controllers.get('count', 0)} controllers for the group",
"group_controllers": group_controllers.get("data", []),
"count": group_controllers.get("count", 0),
}
)
except Exception as e:
logger.error(f"Error retrieving group controllers: {e}")
return json.dumps({"error": f"Failed to retrieve group controllers: {str(e)}"})
@mcp.tool()
def get_group_dcom_rights(group_id: str, limit: int = 100, skip: int = 0):
"""
Retrieves the DCOM rights of a specific group within the domain.
DCOM rights allow a group to communicate with COM objects on another computer in the network.
These rights can be abused for privilege escalation and lateral movement within the domain.
Args:
group_id: The ID of the group to query
limit: Maximum number of DCOM rights to return (default: 100)
skip: Number of DCOM rights to skip for pagination (default: 0)
"""
try:
group_dcom_rights = bloodhound_api.groups.get_dcom_rights(
group_id, limit=limit, skip=skip
)
return json.dumps(
{
"message": f"Found {group_dcom_rights.get('count', 0)} DCOM rights for the group",
"group_dcom_rights": group_dcom_rights.get("data", []),
"count": group_dcom_rights.get("count", 0),
}
)
except Exception as e:
logger.error(f"Error retrieving group DCOM rights: {e}")
return json.dumps({"error": f"Failed to retrieve group DCOM rights: {str(e)}"})
@mcp.tool()
def get_group_members(group_id: str, limit: int = 100, skip: int = 0):
"""
Retrieves the members of a specific group within the domain.
Group members are the users and groups that are members of the specified group.
These memberships can be used to identify potential targets for lateral movement and privilege escalation.
Args:
group_id: The ID of the group to query
limit: Maximum number of members to return (default: 100)
skip: Number of members to skip for pagination (default: 0)
"""
try:
group_members = bloodhound_api.groups.get_members(
group_id, limit=limit, skip=skip
)
return json.dumps(
{
"message": f"Found {group_members.get('count', 0)} members for the group",
"group_members": group_members.get("data", []),
"count": group_members.get("count", 0),
}
)
except Exception as e:
logger.error(f"Error retrieving group members: {e}")
return json.dumps({"error": f"Failed to retrieve group members: {str(e)}"})
@mcp.tool()
def get_group_memberships(group_id: str, limit: int = 100, skip: int = 0):
"""
Retrieves the group memberships of a specific group within the domain.
Group memberships are the groups that the specified group is a member of within the domain.
These memberships can be used to identify potential targets for lateral movement and privilege escalation.
Args:
group_id: The ID of the group to query
limit: Maximum number of memberships to return (default: 100)
skip: Number of memberships to skip for pagination (default: 0)
"""
try:
group_memberships = bloodhound_api.groups.get_memberships(
group_id, limit=limit, skip=skip
)
return json.dumps(
{
"message": f"Found {group_memberships.get('count', 0)} memberships for the group",
"group_memberships": group_memberships.get("data", []),
"count": group_memberships.get("count", 0),
}
)
except Exception as e:
logger.error(f"Error retrieving group memberships: {e}")
return json.dumps({"error": f"Failed to retrieve group memberships: {str(e)}"})
@mcp.tool()
def get_group_ps_remote_rights(group_id: str, limit: int = 100, skip: int = 0):
"""
Retrieves the remote PowerShell rights of a specific group within the domain.
Remote PowerShell rights allow a group to execute PowerShell commands on a remote computer.
These rights can be abused for lateral movement and privilege escalation within the domain.
Args:
group_id: The ID of the group to query
limit: Maximum number of remote PowerShell rights to return (default: 100)
skip: Number of remote PowerShell rights to skip for pagination (default: 0)
"""
try:
group_ps_remote_rights = bloodhound_api.groups.get_ps_remote_rights(
group_id, limit=limit, skip=skip
)
return json.dumps(
{
"message": f"Found {group_ps_remote_rights.get('count', 0)} remote PowerShell rights for the group",
"group_ps_remote_rights": group_ps_remote_rights.get("data", []),
"count": group_ps_remote_rights.get("count", 0),
}
)
except Exception as e:
logger.error(f"Error retrieving group remote PowerShell rights: {e}")
return json.dumps(
{"error": f"Failed to retrieve group remote PowerShell rights: {str(e)}"}
)
@mcp.tool()
def get_group_rdp_rights(group_id: str, limit: int = 100, skip: int = 0):
"""
Retrieves the RDP rights of a specific group within the domain.
RDP rights allow a group to remotely connect to another computer using the Remote Desktop Protocol.
These rights can be abused for lateral movement and privilege escalation within the domain.
Args:
group_id: The ID of the group to query
limit: Maximum number of RDP rights to return (default: 100)
skip: Number of RDP rights to skip for pagination (default: 0)
"""
try:
group_rdp_rights = bloodhound_api.groups.get_rdp_rights(
group_id, limit=limit, skip=skip
)
return json.dumps(
{
"message": f"Found {group_rdp_rights.get('count', 0)} RDP rights for the group",
"group_rdp_rights": group_rdp_rights.get("data", []),
"count": group_rdp_rights.get("count", 0),
}
)
except Exception as e:
logger.error(f"Error retrieving group RDP rights: {e}")
return json.dumps({"error": f"Failed to retrieve group RDP rights: {str(e)}"})
@mcp.tool()
def get_group_sessions(group_id: str, limit: int = 100, skip: int = 0):
"""
Retrieves the active sessions of the members of a specific group within the domain.
Active sessions are the current sessions that hte members of this group have within the domain.
These sessions can be used to identify potential targets for lateral movement and privilege escalation.
Args:
group_id: The ID of the group to query
limit: Maximum number of sessions to return (default: 100)
skip: Number of sessions to skip for pagination (default: 0)
"""
try:
group_sessions = bloodhound_api.groups.get_sessions(
group_id, limit=limit, skip=skip
)
return json.dumps(
{
"message": f"Found {group_sessions.get('count', 0)} sessions for the group",
"group_sessions": group_sessions.get("data", []),
"count": group_sessions.get("count", 0),
}
)
except Exception as e:
logger.error(f"Error retrieving group sessions: {e}")
return json.dumps({"error": f"Failed to retrieve group sessions: {str(e)}"})
# mcp tools for the /computers apis
@mcp.tool()
def get_computer_info(computer_id: str):
"""
Retrieves information about a specific computer in a specific domain.
This provides a general overview of a computer's information including their name, domain, and other attributes.
It can be used to conduct reconnaissance and start formulating and targeting computers within the domain
Args:
computer_id: The ID of the computer to query
"""
try:
computer_info = bloodhound_api.computers.get_info(computer_id)
return json.dumps(
{
"message": f"Computer information for {computer_info.get('name')}",
"computer_info": computer_info,
}
)
except Exception as e:
logger.error(f"Error retrieving computer information: {e}")
return json.dumps(
{"error": f"Failed to retrieve computer information: {str(e)}"}
)
@mcp.tool()
def get_computer_admin_rights(computer_id: str, limit: int = 100, skip: int = 0):
"""
Retrieves the administrative rights of a specific computer in the domain.
Administrative rights are privileges that allow a computer to perform administrative tasks on a Security Principal (user, group, or computer) in Active Directory.
These rights can be abused in a variety of ways include lateral movement, persistence, and privilege escalation.
Args:
computer_id: The ID of the computer to query
limit: Maximum number of administrative rights to return (default: 100)
skip: Number of administrative rights to skip for pagination (default: 0)
"""
try:
computer_admin_rights = bloodhound_api.computers.get_admin_rights(
computer_id, limit=limit, skip=skip
)
return json.dumps(
{
"message": f"Found {computer_admin_rights.get('count', 0)} administrative rights for the computer",
"computer_admin_rights": computer_admin_rights.get("data", []),
"count": computer_admin_rights.get("count", 0),
}
)
except Exception as e:
logger.error(f"Error retrieving computer administrative rights: {e}")
return json.dumps(
{"error": f"Failed to retrieve computer administrative rights: {str(e)}"}
)
@mcp.tool()
def get_computer_admin_users(computer_id: str, limit: int = 100, skip: int = 0):
"""
Retrieves the administrative users of a specific computer in the domain.
Administrative users are the users that have administrative access to the specified computer.
These users can be used to identify potential targets for lateral movement and privilege escalation.
Args:
computer_id: The ID of the computer to query
limit: Maximum number of administrative users to return (default: 100)
skip: Number of administrative users to skip for pagination (default: 0)
"""
try:
computer_admin_users = bloodhound_api.computers.get_admin_users(
computer_id, limit=limit, skip=skip
)
return json.dumps(
{
"message": f"Found {computer_admin_users.get('count', 0)} administrative users for the computer",
"computer_admin_users": computer_admin_users.get("data", []),
"count": computer_admin_users.get("count", 0),
}
)
except Exception as e:
logger.error(f"Error retrieving computer administrative users: {e}")
return json.dumps(
{"error": f"Failed to retrieve computer administrative users: {str(e)}"}
)
@mcp.tool()
def get_computer_constrained_delegation_rights(
computer_id: str, limit: int = 100, skip: int = 0
):
"""
Retrieves the constrained delegation rights of a specific computer within the domain.
Constrained delegation rights allow a computer to impersonate another user or service when communicating with a service on another computer.
These rights can be abused for privilege escalation and lateral movement within the domain.
Args:
computer_id: The ID of the computer to query
limit: Maximum number of constrained delegation rights to return (default: 100)
skip: Number of constrained delegation rights to skip for pagination (default: 0)
"""
try:
computer_constrained_delegation_rights = (
bloodhound_api.computers.get_constrained_delegation_rights(
computer_id, limit=limit, skip=skip
)
)
return json.dumps(
{
"message": f"Found {computer_constrained_delegation_rights.get('count', 0)} constrained delegation rights for the computer",
"computer_constrained_delegation_rights": computer_constrained_delegation_rights.get(
"data", []
),
"count": computer_constrained_delegation_rights.get("count", 0),
}
)
except Exception as e:
logger.error(f"Error retrieving computer constrained delegation rights: {e}")
return json.dumps(
{
"error": f"Failed to retrieve computer constrained delegation rights: {str(e)}"
}
)
@mcp.tool()
def get_computer_constrained_users(computer_id: str, limit: int = 100, skip: int = 0):
"""
Retrieves the constrained users of a specific computer in the domain.
Constrained users are the users that have constrained delegation access to the specified computer.
These users can be used to identify potential targets for lateral movement and privilege escalation.
Args:
computer_id: The ID of the computer to query
limit: Maximum number of constrained users to return (default: 100)
skip: Number of constrained users to skip for pagination (default: 0)
"""
try:
computer_constrained_users = bloodhound_api.computers.get_constrained_users(
computer_id, limit=limit, skip=skip
)
return json.dumps(
{
"message": f"Found {computer_constrained_users.get('count', 0)} constrained users for the computer",
"computer_constrained_users": computer_constrained_users.get(
"data", []
),
"count": computer_constrained_users.get("count", 0),
}
)
except Exception as e:
logger.error(f"Error retrieving computer constrained users: {e}")
return json.dumps(
{"error": f"Failed to retrieve computer constrained users: {str(e)}"}
)
@mcp.tool()
def get_computer_controllables(computer_id: str, limit: int = 100, skip: int = 0):
"""
Retrieves the Security Princiapls within the domain that a specific computer has administrative control over in the domain.
These are entities that the computer can control and manipulate within the domain.
These are potential targets for lateral movement, privilege escalation, and persistence.
Args:
computer_id: The ID of the computer to query
limit: Maximum number of controllables to return (default: 100)
skip: Number of controllables to skip for pagination (default: 0)
"""
try:
computer_controlables = bloodhound_api.computers.get_controllables(
computer_id, limit=limit, skip=skip
)
return json.dumps(
{
"message": f"Found {computer_controlables.get('count', 0)} controlables for the computer",
"computer_controlables": computer_controlables.get("data", []),
"count": computer_controlables.get("count", 0),
}
)
except Exception as e:
logger.error(f"Error retrieving computer controlables: {e}")
return json.dumps(
{"error": f"Failed to retrieve computer controlables: {str(e)}"}
)
@mcp.tool()
def get_computer_controllers(computer_id: str, limit: int = 100, skip: int = 0):
"""
Retrieves the controllers of a specific computer in the domain.
Controllers are entities that have control over the specified computer
This can be used to help identify paths to gain access to a specific computer.
Args:
computer_id: The ID of the computer to query
limit: Maximum number of controllers to return (default: 100)
skip: Number of controllers to skip for pagination (default: 0)
"""
try:
computer_controllers = bloodhound_api.computers.get_controllers(
computer_id, limit=limit, skip=skip
)
return json.dumps(
{
"message": f"Found {computer_controllers.get('count', 0)} controllers for the computer",
"computer_controllers": computer_controllers.get("data", []),
"count": computer_controllers.get("count", 0),
}
)
except Exception as e:
logger.error(f"Error retrieving computer controllers: {e}")
return json.dumps(
{"error": f"Failed to retrieve computer controllers: {str(e)}"}
)
@mcp.tool()
def get_computer_dcom_rights(computer_id: str, limit: int = 100, skip: int = 0):
"""
Retrieves the a list of security principals that a specific computer to execute COM on
DCOM rights allow a computer to communicate with COM objects on another computer in the network.
These rights can be abused for privilege escalation and lateral movement within the domain.
"""
try:
computer_dcom_rights = bloodhound_api.computers.get_dcom_rights(
computer_id, limit=limit, skip=skip
)
return json.dumps(
{
"message": f"Found {computer_dcom_rights.get('count', 0)} DCOM rights for the computer",
"computer_dcom_rights": computer_dcom_rights.get("data", []),
"count": computer_dcom_rights.get("count", 0),
}
)
except Exception as e:
logger.error(f"Error retrieving computer DCOM rights: {e}")
return json.dumps(
{"error": f"Failed to retrieve computer DCOM rights: {str(e)}"}
)
@mcp.tool()
def get_computer_dcom_users(computer_id: str, limit: int = 100, skip: int = 0):
"""
Retrieves the users that have DCOM rights to a specific computer in the domain.
DCOM rights allow a user to communicate with COM objects on another computer in the network.
These rights can be abused for privilege escalation and lateral movement within the domain.
Args:
computer_id: The ID of the computer to query
limit: Maximum number of DCOM rights to return (default: 100)
skip: Number of DCOM rights to skip for pagination (default: 0)
"""
try:
computer_dcom_users = bloodhound_api.computers.get_dcom_users(
computer_id, limit=limit, skip=skip
)
return json.dumps(
{
"message": f"Found {computer_dcom_users.get('count', 0)} DCOM users for the computer",
"computer_dcom_users": computer_dcom_users.get("data", []),
"count": computer_dcom_users.get("count", 0),
}
)
except Exception as e:
logger.error(f"Error retrieving computer DCOM users: {e}")
return json.dumps(
{"error": f"Failed to retrieve computer DCOM users: {str(e)}"}
)
@mcp.tool()
def get_computer_memberships(computer_id: str, limit: int = 100, skip: int = 0):
"""
Retrieves the group memberships of a specific computer within the domain.
Group memberships are the groups that the specified computer is a member of within the domain.
These memberships can be used to identify potential targets for lateral movement and privilege escalation.
Args:
computer_id: The ID of the computer to query
limit: Maximum number of memberships to return (default: 100)
skip: Number of memberships to skip for pagination (default: 0)
"""
try:
computer_memberships = bloodhound_api.computers.get_memberships(
computer_id, limit=limit, skip=skip
)
return json.dumps(
{
"message": f"Found {computer_memberships.get('count', 0)} memberships for the computer",
"computer_memberships": computer_memberships.get("data", []),
"count": computer_memberships.get("count", 0),
}
)
except Exception as e:
logger.error(f"Error retrieving computer memberships: {e}")
return json.dumps(
{"error": f"Failed to retrieve computer memberships: {str(e)}"}
)
@mcp.tool()
def get_computer_ps_remote_rights(computer_id: str, limit: int = 100, skip: int = 0):
"""
Retrieves a list of hosts that this specific computer has the right to PS remote to
Remote PowerShell rights allow a computer to execute PowerShell commands on a remote computer.
These rights can be abused for lateral movement and privilege escalation within the domain.
Args:
computer_id: The ID of the computer to query
limit: Maximum number of remote PowerShell rights to return (default: 100)
skip: Number of remote PowerShell rights to skip for pagination (default: 0)
"""
try:
computer_ps_remote_rights = bloodhound_api.computers.get_ps_remote_rights(
computer_id, limit=limit, skip=skip
)
return json.dumps(
{
"message": f"Found {computer_ps_remote_rights.get('count', 0)} remote PowerShell rights for the computer",
"computer_ps_remote_rights": computer_ps_remote_rights.get("data", []),
"count": computer_ps_remote_rights.get("count", 0),
}
)
except Exception as e:
logger.error(f"Error retrieving computer remote PowerShell rights: {e}")
return json.dumps(
{"error": f"Failed to retrieve computer remote PowerShell rights: {str(e)}"}
)
@mcp.tool()
def get_computer_ps_remote_users(computer_id: str, limit: int = 100, skip: int = 0):
"""
This retieves the users that have PS remote rights to this specific computer in the domain.
Remote PowerShell rights allow a user to execute PowerShell commands on a remote computer.
These rights can be abused for lateral movement and privilege escalation within the domain.
Args:
computer_id: The ID of the computer to query
limit: Maximum number of remote PowerShell rights to return (default: 100)
skip: Number of remote PowerShell rights to skip for pagination (default: 0)
"""
try:
computer_ps_remote_users = bloodhound_api.computers.get_ps_remote_users(
computer_id, limit=limit, skip=skip
)
return json.dumps(
{
"message": f"Found {computer_ps_remote_users.get('count', 0)} remote PowerShell users for the computer",
"computer_ps_remote_users": computer_ps_remote_users.get("data", []),
"count": computer_ps_remote_users.get("count", 0),
}
)
except Exception as e:
logger.error(f"Error retrieving computer remote PowerShell users: {e}")
return json.dumps(
{"error": f"Failed to retrieve computer remote PowerShell users: {str(e)}"}
)
@mcp.tool()
def get_computer_rdp_rights(computer_id: str, limit: int = 100, skip: int = 0):
"""
Retrieves a list of hosts that this specific computer has the right to RDP to
RDP rights allow a computer to remotely connect to another computer using the Remote Desktop Protocol.
These rights can be abused for lateral movement and privilege escalation within the domain.
Args:
computer_id: The ID of the computer to query
limit: Maximum number of RDP rights to return (default: 100)
skip: Number of RDP rights to skip for pagination (default: 0)
"""
try:
computer_rdp_rights = bloodhound_api.computers.get_rdp_rights(
computer_id, limit=limit, skip=skip
)
return json.dumps(
{
"message": f"Found {computer_rdp_rights.get('count', 0)} RDP rights for the computer",
"computer_rdp_rights": computer_rdp_rights.get("data", []),
"count": computer_rdp_rights.get("count", 0),
}
)
except Exception as e:
logger.error(f"Error retrieving computer RDP rights: {e}")
return json.dumps(
{"error": f"Failed to retrieve computer RDP rights: {str(e)}"}
)
@mcp.tool()
def get_computer_rdp_users(computer_id: str, limit: int = 100, skip: int = 0):
"""
This retieves the users that have RDP rights to this specific computer in the domain.
RDP rights allow a user to remotely connect to another computer using the Remote Desktop Protocol.
These rights can be abused for lateral movement and privilege escalation within the domain.
Args:
computer_id: The ID of the computer to query
limit: Maximum number of RDP rights to return (default: 100)
skip: Number of RDP rights to skip for pagination (default: 0)
"""
try:
computer_rdp_users = bloodhound_api.computers.get_rdp_users(
computer_id, limit=limit, skip=skip
)
return json.dumps(
{
"message": f"Found {computer_rdp_users.get('count', 0)} RDP users for the computer",
"computer_rdp_users": computer_rdp_users.get("data", []),
"count": computer_rdp_users.get("count", 0),
}
)
except Exception as e:
logger.error(f"Error retrieving computer RDP users: {e}")
return json.dumps({"error": f"Failed to retrieve computer RDP users: {str(e)}"})
@mcp.tool()
def get_computer_sessions(computer_id: str, limit: int = 100, skip: int = 0):
"""
Retrieves the active sessions of a specific computer within the domain.
Active sessions are the current sessions that a computer has within the domain.
These sessions can be used to identify potential targets for lateral movement and privilege escalation.
These sessions can also be used to formulate and inform on attack paths because if a user has an active session on a host their credentials are cached in memory
Args:
computer_id: The ID of the computer to query
limit: Maximum number of sessions to return (default: 100)
skip: Number of sessions to skip for pagination (default: 0)
"""
try:
computer_sessions = bloodhound_api.computers.get_sessions(
computer_id, limit=limit, skip=skip
)
return json.dumps(
{
"message": f"Found {computer_sessions.get('count', 0)} sessions for the computer",
"computer_sessions": computer_sessions.get("data", []),
"count": computer_sessions.get("count", 0),
}
)
except Exception as e:
logger.error(f"Error retrieving computer sessions: {e}")
return json.dumps({"error": f"Failed to retrieve computer sessions: {str(e)}"})
@mcp.tool()
def get_computer_sql_admin_rights(computer_id: str, limit: int = 100, skip: int = 0):
"""
Retrieves the SQL administrative rights of a specific computer within the domain.
SQL administrative rights allow a computer to perform administrative tasks on a SQL Server.
These rights can be abused for lateral movement and privilege escalation within the domain.
Args:
computer_id: The ID of the computer to query
limit: Maximum number of SQL administrative rights to return (default: 100)
skip: Number of SQL administrative rights to skip for pagination (default: 0)
"""
try:
computer_sql_admin_rights = bloodhound_api.computers.get_sql_admin_rights(
computer_id, limit=limit, skip=skip
)
return json.dumps(
{
"message": f"Found {computer_sql_admin_rights.get('count', 0)} SQL administrative rights for the computer",
"computer_sql_admin_rights": computer_sql_admin_rights.get("data", []),
"count": computer_sql_admin_rights.get("count", 0),
}
)
except Exception as e:
logger.error(f"Error retrieving computer SQL administrative rights: {e}")
return json.dumps(
{
"error": f"Failed to retrieve computer SQL administrative rights: {str(e)}"
}
)
# mcp tools for the OUs apis
@mcp.tool()
def get_ou_info(ou_id: str):
"""
Retrieves information about a specific OU in a specific domain.
This provides a general overview of an OU's information including their name, domain, and other attributes.
It can be used to conduct reconnaissance and start formulating and targeting OUs within the domain
Args:
ou_id: The ID of the OU to query
"""
try:
ou_info = bloodhound_api.ous.get_info(ou_id)
return json.dumps(
{"message": f"OU information for {ou_info.get('name')}", "ou_info": ou_info}
)
except Exception as e:
logger.error(f"Error retrieving OU information: {e}")
return json.dumps({"error": f"Failed to retrieve OU information: {str(e)}"})
@mcp.tool()
def get_ou_computers(ou_id: str, limit: int = 100, skip: int = 0):
"""
Retrieves the computers within a specific OU in the domain.
This can be used to identify potential targets for lateral movement and privilege escalation.
Args:
ou_id: The ID of the OU to query
limit: Maximum number of computers to return (default: 100)
skip: Number of computers to skip for pagination (default: 0)
"""
try:
ou_computers = bloodhound_api.ous.get_computers(ou_id, limit=limit, skip=skip)
return json.dumps(
{
"message": f"Found {ou_computers.get('count', 0)} computers for the OU",
"ou_computers": ou_computers.get("data", []),
"count": ou_computers.get("count", 0),
}
)
except Exception as e:
logger.error(f"Error retrieving OU computers: {e}")
return json.dumps({"error": f"Failed to retrieve OU computers: {str(e)}"})
@mcp.tool()
def get_ou_groups(ou_id: str, limit: int = 100, skip: int = 0):
"""
Retrieves the groups within a specific OU in the domain.
This can be used to identify potential targets for lateral movement and privilege escalation.
Args:
ou_id: The ID of the OU to query
limit: Maximum number of groups to return (default: 100)
skip: Number of groups to skip for pagination (default: 0)
"""
try:
ou_groups = bloodhound_api.ous.get_groups(ou_id, limit=limit, skip=skip)
return json.dumps(
{
"message": f"Found {ou_groups.get('count', 0)} groups for the OU",
"ou_groups": ou_groups.get("data", []),
"count": ou_groups.get("count", 0),
}
)
except Exception as e:
logger.error(f"Error retrieving OU groups: {e}")
return json.dumps({"error": f"Failed to retrieve OU groups: {str(e)}"})
@mcp.tool()
def get_ou_gpos(ou_id: str, limit: int = 100, skip: int = 0):
"""
Retrieves the GPOs within a specific OU in the domain.
This can be used to identify potential targets for lateral movement and privilege escalation.
Args:
ou_id: The ID of the OU to query
limit: Maximum number of GPOs to return (default: 100)
skip: Number of GPOs to skip for pagination (default: 0)
"""
try:
ou_gpos = bloodhound_api.ous.get_gpos(ou_id, limit=limit, skip=skip)
return json.dumps(
{
"message": f"Found {ou_gpos.get('count', 0)} GPOs for the OU",
"ou_gpos": ou_gpos.get("data", []),
"count": ou_gpos.get("count", 0),
}
)
except Exception as e:
logger.error(f"Error retrieving OU GPOs: {e}")
return json.dumps({"error": f"Failed to retrieve OU GPOs: {str(e)}"})
@mcp.tool()
def get_ou_users(ou_id: str, limit: int = 100, skip: int = 0):
"""
Retrieves the users within a specific OU in the domain.
This can be used to identify potential targets for lateral movement and privilege escalation.
Args:
ou_id: The ID of the OU to query
limit: Maximum number of users to return (default: 100)
skip: Number of users to skip for pagination (default: 0)
"""
try:
ou_users = bloodhound_api.ous.get_users(ou_id, limit=limit, skip=skip)
return json.dumps(
{
"message": f"Found {ou_users.get('count', 0)} users for the OU",
"ou_users": ou_users.get("data", []),
"count": ou_users.get("count", 0),
}
)
except Exception as e:
logger.error(f"Error retrieving OU users: {e}")
return json.dumps({"error": f"Failed to retrieve OU users: {str(e)}"})
# GPO tools
@mcp.tool()
def get_gpo_info(gpo_id: str):
"""
Retrieves information about a specific GPO in a specific domain.
This provides a general overview of a GPO's information including their name, domain, and other attributes.
It can be used to conduct reconnaissance and start formulating and targeting GPOs within the domain
Args:
gpo_id: The ID of the GPO to query
"""
try:
gpo_info = bloodhound_api.gpos.get_info(gpo_id)
return json.dumps(
{
"message": f"GPO information for {gpo_info.get('name')}",
"gpo_info": gpo_info,
}
)
except Exception as e:
logger.error(f"Error retrieving GPO information: {e}")
return json.dumps({"error": f"Failed to retrieve GPO information: {str(e)}"})
@mcp.tool()
def get_gpo_computers(gpo_id: str, limit: int = 100, skip: int = 0):
"""
Retrieves the computers within a specific GPO in the domain.
This can be used to identify potential targets for lateral movement and privilege escalation.
Args:
gpo_id: The ID of the GPO to query
limit: Maximum number of computers to return (default: 100)
skip: Number of computers to skip for pagination (default: 0)
"""
try:
gpo_computers = bloodhound_api.gpos.get_computers(
gpo_id, limit=limit, skip=skip
)
return json.dumps(
{
"message": f"Found {gpo_computers.get('count', 0)} computers for the GPO",
"gpo_computers": gpo_computers.get("data", []),
"count": gpo_computers.get("count", 0),
}
)
except Exception as e:
logger.error(f"Error retrieving GPO computers: {e}")
return json.dumps({"error": f"Failed to retrieve GPO computers: {str(e)}"})
@mcp.tool()
def get_gpo_controllers(gpo_id: str, limit: int = 100, skip: int = 0):
"""
Retrieves the controllers of a specific GPO in the domain.
Controllers are entities that have control over the specified GPO
This can be used to help identify paths to gain access to a specific GPO.
Args:
gpo_id: The ID of the GPO to query
limit: Maximum number of controllers to return (default: 100)
skip: Number of controllers to skip for pagination (default: 0)
"""
try:
gpo_controllers = bloodhound_api.gpos.get_controllers(
gpo_id, limit=limit, skip=skip
)
return json.dumps(
{
"message": f"Found {gpo_controllers.get('count', 0)} controllers for the GPO",
"gpo_controllers": gpo_controllers.get("data", []),
"count": gpo_controllers.get("count", 0),
}
)
except Exception as e:
logger.error(f"Error retrieving GPO controllers: {e}")
return json.dumps({"error": f"Failed to retrieve GPO controllers: {str(e)}"})
@mcp.tool()
def get_gpo_ous(gpo_id: str, limit: int = 100, skip: int = 0):
"""
Retrieves the OUs that are linked to a specific GPO in the domain.
This can be used to identify potential targets for lateral movement and privilege escalation.
Args:
gpo_id: The ID of the GPO to query
limit: Maximum number of OUs to return (default: 100)
skip: Number of OUs to skip for pagination (default: 0)
"""
try:
gpo_ous = bloodhound_api.gpos.get_ous(gpo_id, limit=limit, skip=skip)
return json.dumps(
{
"message": f"Found {gpo_ous.get('count', 0)} OUs for the GPO",
"gpo_ous": gpo_ous.get("data", []),
"count": gpo_ous.get("count", 0),
}
)
except Exception as e:
logger.error(f"Error retrieving GPO OUs: {e}")
return json.dumps({"error": f"Failed to retrieve GPO OUs: {str(e)}"})
@mcp.tool()
def get_gpo_tier_zeros(gpo_id: str, limit: int = 100, skip: int = 0):
"""
Retrieves the Tier 0 groups that are linked to a specific GPO in the domain.
Tier 0 groups are the highest privileged groups in the domain and have access to all resources.
This can be used to identify potential targets for lateral movement and privilege escalation.
Args:
gpo_id: The ID of the GPO to query
limit: Maximum number of Tier 0 groups to return (default: 100)
skip: Number of Tier 0 groups to skip for pagination (default: 0)
"""
try:
gpo_tier_zeros = bloodhound_api.gpos.get_tier_zeros(
gpo_id, limit=limit, skip=skip
)
return json.dumps(
{
"message": f"Found {gpo_tier_zeros.get('count', 0)} Tier 0 groups for the GPO",
"gpo_tier_zeros": gpo_tier_zeros.get("data", []),
"count": gpo_tier_zeros.get("count", 0),
}
)
except Exception as e:
logger.error(f"Error retrieving GPO Tier 0 groups: {e}")
return json.dumps({"error": f"Failed to retrieve GPO Tier 0 groups: {str(e)}"})
@mcp.tool()
def get_gpo_users(gpo_id: str, limit: int = 100, skip: int = 0):
"""
Retrieves the users within a specific GPO in the domain.
This can be used to identify potential targets for lateral movement and privilege escalation.
Args:
gpo_id: The ID of the GPO to query
limit: Maximum number of users to return (default: 100)
skip: Number of users to skip for pagination (default: 0)
"""
try:
gpo_users = bloodhound_api.gpos.get_users(gpo_id, limit=limit, skip=skip)
return json.dumps(
{
"message": f"Found {gpo_users.get('count', 0)} users for the GPO",
"gpo_users": gpo_users.get("data", []),
"count": gpo_users.get("count", 0),
}
)
except Exception as e:
logger.error(f"Error retrieving GPO users: {e}")
return json.dumps({"error": f"Failed to retrieve GPO users: {str(e)}"})
# MCP tools for the /graph apis except for cypher queries to be implemented later
@mcp.tool()
def search_graph(query: str, search_type: str = "fuzzy"):
"""
Search for nodes in the Bloodhound graph by name.
This function lets you find specific nodes in the graph based on a search query.
Results are typically returned as matches on node names.
Args:
query: Search text to find nodes by name
search_type: Type of search to perform - "fuzzy" (default) for approximate matches, "exact" for exact matches
"""
try:
results = bloodhound_api.graph.search(query, search_type)
return json.dumps(
{
"message": f"Search results for '{query}'",
"results": results.get("data", []),
}
)
except Exception as e:
logger.error(f"Error searching graph: {e}")
return json.dumps({"error": f"Failed to search graph: {str(e)}"})
@mcp.tool()
def get_shortest_path(start_node: str, end_node: str, relationship_kinds: str = None):
"""
Find the shortest path between two nodes in the Bloodhound graph.
This is useful for attack path analysis, showing the most direct route between two security principals.
The path will show all the intermediary nodes and the types of relationships connecting them.
If this returns a 500 or 404 error it is likely that the path does not exist within bloodhound
Args:
start_node: Object ID of the starting node (source)
end_node: Object ID of the ending node (target)
relationship_kinds: Optional comma-separated list of relationship types to include in the path
"""
try:
path = bloodhound_api.graph.get_shortest_path(
start_node, end_node, relationship_kinds
)
return json.dumps(
{
"message": f"Shortest path from {start_node} to {end_node}",
"path": path.get("data", {}),
}
)
except Exception as e:
logger.error(f"Error getting shortest path: {e}")
return json.dumps({"error": f"Failed to get shortest path: {str(e)}"})
@mcp.tool()
def get_edge_composition(source_node: int, target_node: int, edge_type: str):
"""
Analyze the components of a complex edge between two nodes.
In Bloodhound, many high-level edges (like "HasPath" or "AdminTo") are composed of multiple
individual relationships. This function reveals those underlying components.
This is useful for understanding exactly how security principals are connected.
Args:
source_node: ID of the source node
target_node: ID of the target node
edge_type: Type of edge to analyze (e.g., "MemberOf", "AdminTo", "CanRDP")
"""
try:
composition = bloodhound_api.graph.get_edge_composition(
source_node, target_node, edge_type
)
return json.dumps(
{
"message": f"Edge composition for {edge_type} edge from {source_node} to {target_node}",
"composition": composition.get("data", {}),
}
)
except Exception as e:
logger.error(f"Error getting edge composition: {e}")
return json.dumps({"error": f"Failed to get edge composition: {str(e)}"})
@mcp.tool()
def get_relay_targets(source_node: int, target_node: int, edge_type: str):
"""
Find valid relay targets for a given edge in the Bloodhound graph.
Relay targets represent potential nodes that could be used to relay an attack or
privilege escalation between two nodes. This is critical for advanced attack path planning.
Args:
source_node: ID of the source node
target_node: ID of the target node
edge_type: Type of edge (relationship) between the nodes
"""
try:
targets = bloodhound_api.graph.get_relay_targets(
source_node, target_node, edge_type
)
return json.dumps(
{
"message": f"Relay targets for {edge_type} edge from {source_node} to {target_node}",
"targets": targets.get("data", {}),
}
)
except Exception as e:
logger.error(f"Error getting relay targets: {e}")
return json.dumps({"error": f"Failed to get relay targets: {str(e)}"})
# MCP Tools for Active Directory Certificate Services (AD CS) APIs
@mcp.tool()
def get_cert_template_info(template_id: str):
"""
Retrieves information about a specific Certificate Template.
Certificate Templates define the properties and security settings for certificates that can be issued.
They can be abused for privilege escalation if misconfigured.
Args:
template_id: The ID of the Certificate Template to query
"""
try:
cert_template_info = bloodhound_api.adcs.get_cert_template_info(template_id)
return json.dumps(
{
"message": f"Certificate Template information for {cert_template_info.get('name', template_id)}",
"cert_template_info": cert_template_info,
}
)
except Exception as e:
logger.error(f"Error retrieving Certificate Template information: {e}")
return json.dumps(
{"error": f"Failed to retrieve Certificate Template information: {str(e)}"}
)
@mcp.tool()
def get_cert_template_controllers(template_id: str, limit: int = 100, skip: int = 0):
"""
Retrieves the controllers of a specific Certificate Template.
Controllers are security principals that can modify the Certificate Template or its properties.
This is critical for identifying ESC2 vulnerabilities (vulnerable Certificate Template access control).
Args:
template_id: The ID of the Certificate Template to query
limit: Maximum number of controllers to return (default: 100)
skip: Number of controllers to skip for pagination (default: 0)
"""
try:
cert_template_controllers = bloodhound_api.adcs.get_cert_template_controllers(
template_id, limit=limit, skip=skip
)
return json.dumps(
{
"message": f"Found {cert_template_controllers.get('count', 0)} controllers for the Certificate Template",
"cert_template_controllers": cert_template_controllers.get("data", []),
"count": cert_template_controllers.get("count", 0),
}
)
except Exception as e:
logger.error(f"Error retrieving Certificate Template controllers: {e}")
return json.dumps(
{"error": f"Failed to retrieve Certificate Template controllers: {str(e)}"}
)
# MCP tools for Root CAs
@mcp.tool()
def get_root_ca_info(ca_id: str):
"""
Retrieves information about a specific Root Certificate Authority.
Root CAs are the foundation of trust in a PKI infrastructure.
Controlling a Root CA allows an attacker to issue trusted certificates.
Args:
ca_id: The ID of the Root CA to query
"""
try:
root_ca_info = bloodhound_api.adcs.get_root_ca_info(ca_id)
return json.dumps(
{
"message": f"Root CA information for {root_ca_info.get('name', ca_id)}",
"root_ca_info": root_ca_info,
}
)
except Exception as e:
logger.error(f"Error retrieving Root CA information: {e}")
return json.dumps(
{"error": f"Failed to retrieve Root CA information: {str(e)}"}
)
@mcp.tool()
def get_root_ca_controllers(ca_id: str, limit: int = 100, skip: int = 0):
"""
Retrieves the controllers of a specific Root Certificate Authority.
Controllers of a Root CA can compromise the entire PKI infrastructure.
This is critical for identifying ESC4 and ESC5 attack paths.
Args:
ca_id: The ID of the Root CA to query
limit: Maximum number of controllers to return (default: 100)
skip: Number of controllers to skip for pagination (default: 0)
"""
try:
root_ca_controllers = bloodhound_api.adcs.get_root_ca_controllers(
ca_id, limit=limit, skip=skip
)
return json.dumps(
{
"message": f"Found {root_ca_controllers.get('count', 0)} controllers for the Root CA",
"root_ca_controllers": root_ca_controllers.get("data", []),
"count": root_ca_controllers.get("count", 0),
}
)
except Exception as e:
logger.error(f"Error retrieving Root CA controllers: {e}")
return json.dumps(
{"error": f"Failed to retrieve Root CA controllers: {str(e)}"}
)
# MCP tools for Enterprise CAs
@mcp.tool()
def get_enterprise_ca_info(ca_id: str):
"""
Retrieves information about a specific Enterprise Certificate Authority.
Enterprise CAs issue certificates within the organization based on Certificate Templates.
They are critical components in the Active Directory PKI infrastructure.
Args:
ca_id: The ID of the Enterprise CA to query
"""
try:
enterprise_ca_info = bloodhound_api.adcs.get_enterprise_ca_info(ca_id)
return json.dumps(
{
"message": f"Enterprise CA information for {enterprise_ca_info.get('name', ca_id)}",
"enterprise_ca_info": enterprise_ca_info,
}
)
except Exception as e:
logger.error(f"Error retrieving Enterprise CA information: {e}")
return json.dumps(
{"error": f"Failed to retrieve Enterprise CA information: {str(e)}"}
)
@mcp.tool()
def get_enterprise_ca_controllers(ca_id: str, limit: int = 100, skip: int = 0):
"""
Retrieves the controllers of a specific Enterprise Certificate Authority.
Controllers of an Enterprise CA can issue arbitrary certificates and potentially compromise the domain.
This is critical for identifying ESC3 and ESC6 attack paths.
Args:
ca_id: The ID of the Enterprise CA to query
limit: Maximum number of controllers to return (default: 100)
skip: Number of controllers to skip for pagination (default: 0)
"""
try:
enterprise_ca_controllers = bloodhound_api.adcs.get_enterprise_ca_controllers(
ca_id, limit=limit, skip=skip
)
return json.dumps(
{
"message": f"Found {enterprise_ca_controllers.get('count', 0)} controllers for the Enterprise CA",
"enterprise_ca_controllers": enterprise_ca_controllers.get("data", []),
"count": enterprise_ca_controllers.get("count", 0),
}
)
except Exception as e:
logger.error(f"Error retrieving Enterprise CA controllers: {e}")
return json.dumps(
{"error": f"Failed to retrieve Enterprise CA controllers: {str(e)}"}
)
# MCP tools for AIA CAs
@mcp.tool()
def get_aia_ca_controllers(ca_id: str, limit: int = 100, skip: int = 0):
"""
Retrieves the controllers of a specific AIA Certificate Authority.
AIA (Authority Information Access) CAs provide additional trust information.
Controllers of an AIA CA may be able to perform certificate-based attacks.
Args:
ca_id: The ID of the AIA CA to query
limit: Maximum number of controllers to return (default: 100)
skip: Number of controllers to skip for pagination (default: 0)
"""
try:
aia_ca_controllers = bloodhound_api.adcs.get_aia_ca_controllers(
ca_id, limit=limit, skip=skip
)
return json.dumps(
{
"message": f"Found {aia_ca_controllers.get('count', 0)} controllers for the AIA CA",
"aia_ca_controllers": aia_ca_controllers.get("data", []),
"count": aia_ca_controllers.get("count", 0),
}
)
except Exception as e:
logger.error(f"Error retrieving AIA CA controllers: {e}")
return json.dumps({"error": f"Failed to retrieve AIA CA controllers: {str(e)}"})
# MCP tools for getting the AI to leverage Cypher Queries
# Enhanced run_cypher_query MCP tool for main.py
# Replace the existing run_cypher_query function with this version
@mcp.tool()
def run_cypher_query(query: str, include_properties: bool = True):
"""
Run a custom Cypher query on the BloodHound Neo4j database.
This tool properly interprets BloodHound's HTTP response codes:
- 200: Query successful with results
- 404: Query successful but no results found (NOT an error!)
- 400: Query syntax error
- 401/403: Authentication/permission issues
- 5xx: Server errors
Args:
query: The Cypher query to execute
include_properties: Whether to include node/edge properties in the response
Returns:
JSON response with graph data (nodes and edges) and execution metadata
"""
try:
# Use the enhanced run_query method
result = bloodhound_api.cypher.run_query(query, include_properties)
# Handle different result structures (for backward compatibility)
if isinstance(result, dict) and "metadata" in result:
# New format with metadata
has_results = result["metadata"].get("has_results", True)
result_data = result.get("data", result)
metadata = result["metadata"]
else:
# Legacy format or direct data
result_data = result
has_results = bool(result_data.get("nodes") or result_data.get("edges"))
metadata = {
"status": "success_with_results" if has_results else "success_no_results",
"query": query,
"has_results": has_results,
"status_code": 200
}
# Check if query found results or not
if has_results:
return json.dumps({
"success": True,
"message": "Cypher query executed successfully - results found",
"result": result_data,
"metadata": metadata,
"query_info": {
"query": query,
"execution_status": "success_with_results",
"result_count": {
"nodes": len(result_data.get("nodes", [])),
"edges": len(result_data.get("edges", []))
}
}
})
else:
# Query was successful but found no results
return json.dumps({
"success": True,
"message": "Cypher query executed successfully - no results found",
"result": result_data,
"metadata": metadata,
"query_info": {
"query": query,
"execution_status": "success_no_results",
"interpretation": "This may indicate: 1) No attack paths exist, 2) No objects match the criteria, 3) The environment doesn't have the queried objects"
}
})
except BloodhoundAPIError as e:
# Handle different types of API errors appropriately
if e.status_code == 400:
return json.dumps({
"success": False,
"error_type": "syntax_error",
"message": "Cypher query syntax error",
"error": str(e),
"query": query,
"suggestions": [
"Check node labels (User, Computer, Group, Domain, AZUser, etc.)",
"Verify relationship types (MemberOf, AdminTo, HasSession, etc.)",
"Ensure property names are correct (.name, .enabled, .owned, etc.)",
"Use get_bloodhound_schema() to see available options",
"Try validate_cypher_query() first to check syntax"
]
})
elif e.status_code is not None and e.status_code in [401, 403]:
return json.dumps({
"success": False,
"error_type": "authentication_error",
"message": "Authentication or permission error",
"error": str(e),
"suggestions": [
"Check your BloodHound API credentials",
"Verify your user has Cypher query permissions",
"Ensure your API token hasn't expired"
]
})
elif e.status_code is not None and e.status_code >= 500:
return json.dumps({
"success": False,
"error_type": "server_error",
"message": "BloodHound server error",
"error": str(e),
"suggestions": [
"Check if BloodHound server is running",
"Try a simpler query to test connectivity",
"Check BloodHound server logs for details"
]
})
else:
return json.dumps({
"success": False,
"error_type": "unknown_error",
"message": f"Unexpected API error (HTTP {e.status_code if e.status_code is not None else 'unknown'})",
"error": str(e),
"query": query
})
except BloodhoundConnectionError as e:
return json.dumps({
"success": False,
"error_type": "connection_error",
"message": "Failed to connect to BloodHound server",
"error": str(e),
"suggestions": [
"Check if BloodHound server is running",
"Verify network connectivity",
"Check your BLOODHOUND_DOMAIN environment variable"
]
})
except Exception as e:
logger.error(f"Unexpected error executing Cypher query: {e}")
return json.dumps({
"success": False,
"error_type": "unexpected_error",
"message": "Unexpected error executing Cypher query",
"error": f"Failed to execute Cypher query: {str(e)}",
"query": query
})
# Add a new tool for checking query execution status
@mcp.tool()
def interpret_cypher_result(query: str, result_json: str):
"""
Help interpret the results of a Cypher query for offensive security analysis.
Args:
query: The original Cypher query that was executed
result_json: The JSON result from run_cypher_query
Returns:
Human-readable interpretation of what the results mean for security analysis
"""
try:
result = json.loads(result_json) if isinstance(result_json, str) else result_json
if not result.get("success", False):
return json.dumps({
"interpretation": "Query failed to execute",
"error_analysis": result.get("error", "Unknown error"),
"recommendations": result.get("suggestions", [])
})
query_lower = query.lower()
result_data = result.get("result", {})
nodes = result_data.get("nodes", [])
edges = result_data.get("edges", [])
interpretations = []
if "domain admin" in query_lower:
if nodes:
interpretations.append(f"Found {len(nodes)} objects related to Domain Admins")
interpretations.append("These represent potential high-value targets or current administrators")
else:
interpretations.append("No Domain Admin relationships found")
interpretations.append("This could mean: 1) No standard DA group exists, 2) No users are currently DAs, 3) Different naming convention used")
elif "kerberoast" in query_lower or "hasspn" in query_lower:
if nodes:
spn_users = [n for n in nodes if n.get("hasspn") == True]
interpretations.append(f"Found {len(spn_users)} Kerberoastable service accounts")
interpretations.append("These accounts can be targeted for credential extraction via Kerberoasting")
else:
interpretations.append("No Kerberoastable accounts found")
interpretations.append("Environment may not have service accounts with SPNs")
elif "owned" in query_lower:
if nodes:
owned_objects = [n for n in nodes if n.get("owned") == True]
interpretations.append(f"Found {len(owned_objects)} compromised objects")
interpretations.append("These represent your current foothold in the environment")
else:
interpretations.append("No owned objects marked in BloodHound")
interpretations.append("Mark compromised accounts as 'owned' to see attack paths")
elif "shortestpath" in query_lower:
if edges:
interpretations.append(f"Found attack path with {len(edges)} steps")
interpretations.append("This represents a potential escalation route")
else:
interpretations.append("No attack path found between specified objects")
interpretations.append("May need to compromise intermediate objects first")
elif any(azure_term in query_lower for azure_term in ["azure", "azuser", "azglobal"]):
if nodes:
interpretations.append(f"Found {len(nodes)} Azure/Entra ID objects")
interpretations.append("Focus on privileged roles and service principals")
else:
interpretations.append("No Azure objects found")
interpretations.append("Environment may be on-premises only or data not collected")
else:
if nodes:
interpretations.append(f"Query returned {len(nodes)} nodes and {len(edges)} relationships")
interpretations.append("Review the objects for potential security impact")
else:
interpretations.append("Query found no matching objects")
return json.dumps({
"query_type": "offensive_security_analysis",
"interpretation": interpretations,
"tactical_recommendations": [
"Review high-value objects first",
"Look for attack paths to privileged accounts",
"Check for misconfigurations or excessive permissions",
"Mark newly compromised objects as 'owned'"
],
"data_summary": {
"nodes_found": len(nodes),
"relationships_found": len(edges),
"has_results": len(nodes) > 0 or len(edges) > 0
}
})
except Exception as e:
return json.dumps({
"error": f"Failed to interpret result: {str(e)}",
"recommendation": "Check the result format and try again"
})
# Create saved query management tools
@mcp.tool()
def create_saved_query(name: str, query: str):
"""
Create a new saved Cypher query.
Args:
name: Name for the saved query
query: The Cypher query to save
Returns:
JSON response with the created saved query data
"""
try:
saved_query = bloodhound_api.cypher.create_saved_query(name, query)
return json.dumps(
{
"message": f"Successfully created saved query: {name}",
"query": saved_query,
}
)
except Exception as e:
logger.error(f"Error creating saved query: {e}")
return json.dumps({"error": f"Failed to create saved query: {str(e)}"})
# list already saved queries
@mcp.tool()
def list_saved_queries(skip: int = 0, limit: int = 100, name: str = None):
"""
List saved Cypher queries.
Args:
skip: Number of queries to skip for pagination
limit: Maximum number of queries to return
name: Filter by query name
Returns:
JSON response with list of saved queries
"""
try:
queries = bloodhound_api.cypher.list_saved_queries(skip, limit, name)
return json.dumps(
{"message": f"Found {len(queries)} saved queries", "queries": queries}
)
except Exception as e:
logger.error(f"Error listing saved queries: {e}")
return json.dumps({"error": f"Failed to list saved queries: {str(e)}"})
# ===============================
# DATA QUALITY API TOOLS
# ===============================
@mcp.tool()
def get_data_completeness_stats():
"""
Get database completeness statistics showing the percentage of local admins
and sessions collected across the BloodHound database.
This is useful for understanding data quality and collection coverage.
Returns:
JSON response with completeness statistics
"""
try:
stats = bloodhound_api.data_quality.get_completeness_stats()
return json.dumps({
"message": "Successfully retrieved database completeness statistics",
"completeness_stats": stats.get("data", {}),
})
except Exception as e:
logger.error(f"Error retrieving completeness stats: {e}")
return json.dumps({"error": f"Failed to retrieve completeness stats: {str(e)}"})
@mcp.tool()
def get_ad_domain_data_quality_stats(
domain_id: str,
start: str = None,
end: str = None,
sort_by: str = None,
skip: int = 0,
limit: int = 100
):
"""
Get data quality statistics for a specific Active Directory domain.
Provides time series data showing collection quality over time.
Args:
domain_id: The ID of the AD domain to query
start: Beginning datetime in RFC-3339 format (optional)
end: Ending datetime in RFC-3339 format (optional)
sort_by: Sort by field - 'created_at' or 'updated_at' (optional)
skip: Number of results to skip for pagination
limit: Maximum number of results to return
Returns:
JSON response with AD domain data quality statistics
"""
try:
stats = bloodhound_api.data_quality.get_ad_domain_data_quality_stats(
domain_id, start, end, sort_by, skip, limit
)
return json.dumps({
"message": f"Successfully retrieved AD domain data quality stats for domain: {domain_id}",
"domain_id": domain_id,
"data_quality_stats": stats.get("data", []),
"count": stats.get("count", 0),
})
except Exception as e:
logger.error(f"Error retrieving AD domain data quality stats: {e}")
return json.dumps({"error": f"Failed to retrieve AD domain data quality stats: {str(e)}"})
@mcp.tool()
def get_azure_tenant_data_quality_stats(
tenant_id: str,
start: str = None,
end: str = None,
sort_by: str = None,
skip: int = 0,
limit: int = 100
):
"""
Get data quality statistics for a specific Azure tenant.
Provides time series data showing collection quality over time.
Args:
tenant_id: The ID of the Azure tenant to query
start: Beginning datetime in RFC-3339 format (optional)
end: Ending datetime in RFC-3339 format (optional)
sort_by: Sort by field - 'created_at' or 'updated_at' (optional)
skip: Number of results to skip for pagination
limit: Maximum number of results to return
Returns:
JSON response with Azure tenant data quality statistics
"""
try:
stats = bloodhound_api.data_quality.get_azure_tenant_data_quality_stats(
tenant_id, start, end, sort_by, skip, limit
)
return json.dumps({
"message": f"Successfully retrieved Azure tenant data quality stats for tenant: {tenant_id}",
"tenant_id": tenant_id,
"data_quality_stats": stats.get("data", []),
"count": stats.get("count", 0),
})
except Exception as e:
logger.error(f"Error retrieving Azure tenant data quality stats: {e}")
return json.dumps({"error": f"Failed to retrieve Azure tenant data quality stats: {str(e)}"})
@mcp.tool()
def get_platform_data_quality_stats(
platform_id: str,
start: str = None,
end: str = None,
sort_by: str = None,
skip: int = 0,
limit: int = 100
):
"""
Get aggregate data quality statistics for a platform (AD or Azure).
Provides time series data showing overall collection quality.
Args:
platform_id: Platform ID - must be 'ad' or 'azure'
start: Beginning datetime in RFC-3339 format (optional)
end: Ending datetime in RFC-3339 format (optional)
sort_by: Sort by field - 'created_at' or 'updated_at' (optional)
skip: Number of results to skip for pagination
limit: Maximum number of results to return
Returns:
JSON response with platform data quality statistics
"""
try:
stats = bloodhound_api.data_quality.get_platform_data_quality_stats(
platform_id, start, end, sort_by, skip, limit
)
return json.dumps({
"message": f"Successfully retrieved platform data quality stats for: {platform_id}",
"platform_id": platform_id,
"data_quality_stats": stats.get("data", []),
"count": stats.get("count", 0),
})
except Exception as e:
logger.error(f"Error retrieving platform data quality stats: {e}")
return json.dumps({"error": f"Failed to retrieve platform data quality stats: {str(e)}"})
# ===============================
# CUSTOM NODES API TOOLS
# ===============================
@mcp.tool()
def get_all_custom_nodes():
"""
Get all custom node configurations from BloodHound.
Custom nodes represent objects outside of standard AD and Azure types.
Returns:
JSON response with all custom node configurations including display settings
"""
try:
nodes = bloodhound_api.custom_nodes.get_all_custom_nodes()
return json.dumps({
"message": f"Successfully retrieved {len(nodes.get('data', []))} custom node configurations",
"custom_nodes": nodes.get("data", []),
})
except Exception as e:
logger.error(f"Error retrieving custom nodes: {e}")
return json.dumps({"error": f"Failed to retrieve custom nodes: {str(e)}"})
@mcp.tool()
def get_custom_node(kind_name: str):
"""
Get configuration for a specific custom node kind.
Args:
kind_name: The name of the custom node kind to query
Returns:
JSON response with the custom node configuration
"""
try:
node = bloodhound_api.custom_nodes.get_custom_node(kind_name)
return json.dumps({
"message": f"Successfully retrieved custom node configuration for: {kind_name}",
"custom_node": node.get("data", {}),
})
except Exception as e:
logger.error(f"Error retrieving custom node {kind_name}: {e}")
return json.dumps({"error": f"Failed to retrieve custom node {kind_name}: {str(e)}"})
@mcp.tool()
def create_custom_nodes(custom_types_json: str):
"""
Create new custom node kinds with display metadata.
Args:
custom_types_json: JSON string containing custom types configuration.
Each type should have an 'icon' object with:
- type: "font-awesome"
- name: Icon name (without fa- prefix)
- color: Color in #RGB or #RRGGBB format
Example custom_types_json:
{
"SQLServer": {
"icon": {
"type": "font-awesome",
"name": "database",
"color": "#FF0000"
}
},
"WebApp": {
"icon": {
"type": "font-awesome",
"name": "globe",
"color": "#00FF00"
}
}
}
Returns:
JSON response with created custom node configurations
"""
try:
import json as json_module
custom_types = json_module.loads(custom_types_json)
# Validate each icon configuration
for kind_name, config in custom_types.items():
if "icon" in config:
validation = bloodhound_api.custom_nodes.validate_icon_config(config["icon"])
if not validation["valid"]:
return json.dumps({
"error": f"Invalid icon configuration for {kind_name}: {validation['errors']}"
})
nodes = bloodhound_api.custom_nodes.create_custom_nodes(custom_types)
return json.dumps({
"message": f"Successfully created {len(nodes.get('data', []))} custom node types",
"created_nodes": nodes.get("data", []),
})
except json_module.JSONDecodeError as e:
return json.dumps({"error": f"Invalid JSON format: {str(e)}"})
except Exception as e:
logger.error(f"Error creating custom nodes: {e}")
return json.dumps({"error": f"Failed to create custom nodes: {str(e)}"})
@mcp.tool()
def update_custom_node(kind_name: str, config_json: str):
"""
Update existing custom node kind with display metadata.
Args:
kind_name: The name of the custom node kind to update
config_json: JSON string containing the icon configuration:
{
"icon": {
"type": "font-awesome",
"name": "icon_name",
"color": "#RRGGBB"
}
}
Returns:
JSON response with updated custom node configuration
"""
try:
import json as json_module
config = json_module.loads(config_json)
# Validate icon configuration if present
if "icon" in config:
validation = bloodhound_api.custom_nodes.validate_icon_config(config["icon"])
if not validation["valid"]:
return json.dumps({
"error": f"Invalid icon configuration: {validation['errors']}"
})
node = bloodhound_api.custom_nodes.update_custom_node(kind_name, config)
return json.dumps({
"message": f"Successfully updated custom node: {kind_name}",
"updated_node": node.get("data", {}),
})
except json_module.JSONDecodeError as e:
return json.dumps({"error": f"Invalid JSON format: {str(e)}"})
except Exception as e:
logger.error(f"Error updating custom node {kind_name}: {e}")
return json.dumps({"error": f"Failed to update custom node {kind_name}: {str(e)}"})
@mcp.tool()
def delete_custom_node(kind_name: str):
"""
Delete configuration for a specific custom node kind.
Args:
kind_name: The name of the custom node kind to delete
Returns:
JSON response confirming deletion
"""
try:
bloodhound_api.custom_nodes.delete_custom_node(kind_name)
return json.dumps({
"message": f"Successfully deleted custom node: {kind_name}",
})
except Exception as e:
logger.error(f"Error deleting custom node {kind_name}: {e}")
return json.dumps({"error": f"Failed to delete custom node {kind_name}: {str(e)}"})
@mcp.tool()
def validate_custom_node_icon(icon_config_json: str):
"""
Validate icon configuration for custom nodes before creating/updating.
Args:
icon_config_json: JSON string containing icon configuration:
{
"type": "font-awesome",
"name": "icon_name",
"color": "#RRGGBB"
}
Returns:
JSON response with validation results including any warnings or errors
"""
try:
import json as json_module
icon_config = json_module.loads(icon_config_json)
validation = bloodhound_api.custom_nodes.validate_icon_config(icon_config)
return json.dumps({
"message": "Icon validation completed",
"validation_result": validation,
})
except json_module.JSONDecodeError as e:
return json.dumps({"error": f"Invalid JSON format: {str(e)}"})
except Exception as e:
logger.error(f"Error validating icon config: {e}")
return json.dumps({"error": f"Failed to validate icon config: {str(e)}"})
# ===============================
# ASSET GROUPS API TOOLS
# ===============================
@mcp.tool()
def list_asset_groups(
sort_by: str = None,
name: str = None,
tag: str = None,
system_group: bool = None,
member_count: int = None,
asset_group_id: int = None
):
"""
List all asset isolation groups with optional filtering.
Asset groups help organize and isolate sets of assets for security analysis.
Args:
sort_by: Sort by field - 'name', 'tag', or 'member_count' (optional)
name: Filter by asset group name (optional)
tag: Filter by asset group tag (optional)
system_group: Filter by system group status (optional)
member_count: Filter by member count (optional)
asset_group_id: Filter by specific asset group ID (optional)
Returns:
JSON response with list of asset groups and their configurations
"""
try:
groups = bloodhound_api.asset_groups.list_asset_groups(
sort_by=sort_by,
name=name,
tag=tag,
system_group=system_group,
member_count=member_count,
asset_group_id=asset_group_id
)
asset_groups = groups.get("data", {}).get("asset_groups", [])
return json.dumps({
"message": f"Successfully retrieved {len(asset_groups)} asset groups",
"asset_groups": asset_groups,
})
except Exception as e:
logger.error(f"Error listing asset groups: {e}")
return json.dumps({"error": f"Failed to list asset groups: {str(e)}"})
@mcp.tool()
def create_asset_group(name: str, tag: str):
"""
Create a new asset group for organizing and isolating assets.
Args:
name: Name of the asset group
tag: Tag for the asset group (used for identification and filtering)
Returns:
JSON response with created asset group configuration
"""
try:
group = bloodhound_api.asset_groups.create_asset_group(name, tag)
return json.dumps({
"message": f"Successfully created asset group: {name}",
"asset_group": group.get("data", {}),
})
except Exception as e:
logger.error(f"Error creating asset group: {e}")
return json.dumps({"error": f"Failed to create asset group: {str(e)}"})
@mcp.tool()
def get_asset_group(asset_group_id: int):
"""
Get details for a specific asset group by ID.
Args:
asset_group_id: ID of the asset group to retrieve
Returns:
JSON response with asset group configuration and details
"""
try:
group = bloodhound_api.asset_groups.get_asset_group(asset_group_id)
return json.dumps({
"message": f"Successfully retrieved asset group ID: {asset_group_id}",
"asset_group": group.get("data", {}),
})
except Exception as e:
logger.error(f"Error retrieving asset group {asset_group_id}: {e}")
return json.dumps({"error": f"Failed to retrieve asset group {asset_group_id}: {str(e)}"})
@mcp.tool()
def update_asset_group(asset_group_id: int, name: str):
"""
Update an existing asset group's name.
Args:
asset_group_id: ID of the asset group to update
name: New name for the asset group
Returns:
JSON response with updated asset group configuration
"""
try:
group = bloodhound_api.asset_groups.update_asset_group(asset_group_id, name)
return json.dumps({
"message": f"Successfully updated asset group ID: {asset_group_id}",
"asset_group": group.get("data", {}),
})
except Exception as e:
logger.error(f"Error updating asset group {asset_group_id}: {e}")
return json.dumps({"error": f"Failed to update asset group {asset_group_id}: {str(e)}"})
@mcp.tool()
def delete_asset_group(asset_group_id: int):
"""
Delete an asset group.
Args:
asset_group_id: ID of the asset group to delete
Returns:
JSON response confirming deletion
"""
try:
bloodhound_api.asset_groups.delete_asset_group(asset_group_id)
return json.dumps({
"message": f"Successfully deleted asset group ID: {asset_group_id}",
})
except Exception as e:
logger.error(f"Error deleting asset group {asset_group_id}: {e}")
return json.dumps({"error": f"Failed to delete asset group {asset_group_id}: {str(e)}"})
@mcp.tool()
def list_asset_group_collections(asset_group_id: int, skip: int = 0, limit: int = 100):
"""
List asset group collections (historical memberships).
Collections represent snapshots of asset group membership over time.
Args:
asset_group_id: ID of the asset group
skip: Number of results to skip for pagination
limit: Maximum number of results to return
Returns:
JSON response with asset group collections
"""
try:
collections = bloodhound_api.asset_groups.list_asset_group_collections(
asset_group_id, skip=skip, limit=limit
)
return json.dumps({
"message": f"Successfully retrieved collections for asset group ID: {asset_group_id}",
"collections": collections.get("data", []),
})
except Exception as e:
logger.error(f"Error retrieving asset group collections: {e}")
return json.dumps({"error": f"Failed to retrieve asset group collections: {str(e)}"})
@mcp.tool()
def get_asset_group_member_counts(asset_group_id: int):
"""
Get member count statistics for an asset group, broken down by object kind.
Args:
asset_group_id: ID of the asset group
Returns:
JSON response with total count and counts by object kind
"""
try:
counts = bloodhound_api.asset_groups.list_asset_group_member_counts(asset_group_id)
data = counts.get("data", {})
return json.dumps({
"message": f"Successfully retrieved member counts for asset group ID: {asset_group_id}",
"total_count": data.get("total_count", 0),
"counts_by_kind": data.get("counts", {}),
})
except Exception as e:
logger.error(f"Error retrieving asset group member counts: {e}")
return json.dumps({"error": f"Failed to retrieve asset group member counts: {str(e)}"})
@mcp.tool()
def update_asset_group_selectors(asset_group_id: int, selectors_json: str):
"""
Update selectors for an asset group. Selectors define which objects
should be included in the asset group based on criteria.
Args:
asset_group_id: ID of the asset group
selectors_json: JSON string containing array of selector specifications
Example selectors_json:
[
{
"name": "Domain Controllers",
"selector": "n:Computer WHERE n.serviceprincipalnames CONTAINS 'ldap/'",
"system_selector": false
}
]
Returns:
JSON response with updated asset group configuration
"""
try:
import json as json_module
selectors = json_module.loads(selectors_json)
result = bloodhound_api.asset_groups.update_asset_group_selectors(
asset_group_id, selectors
)
return json.dumps({
"message": f"Successfully updated selectors for asset group ID: {asset_group_id}",
"asset_group": result.get("data", {}),
})
except json_module.JSONDecodeError as e:
return json.dumps({"error": f"Invalid JSON format: {str(e)}"})
except Exception as e:
logger.error(f"Error updating asset group selectors: {e}")
return json.dumps({"error": f"Failed to update asset group selectors: {str(e)}"})
# ===============================
# ASSET GROUP TAGS API TOOLS
# ===============================
@mcp.tool()
def list_asset_group_tags(
sort_by: str = None,
name: str = None,
tag: str = None,
skip: int = 0,
limit: int = 100
):
"""
List asset group tags using the newer tags API.
Asset group tags provide enhanced functionality over basic asset groups.
Args:
sort_by: Sort by field (optional)
name: Filter by tag name (optional)
tag: Filter by tag value (optional)
skip: Number of results to skip for pagination
limit: Maximum number of results to return
Returns:
JSON response with list of asset group tags
"""
try:
tags = bloodhound_api.asset_groups.list_asset_group_tags(
sort_by=sort_by, name=name, tag=tag, skip=skip, limit=limit
)
return json.dumps({
"message": f"Successfully retrieved asset group tags",
"asset_group_tags": tags.get("data", []),
"count": tags.get("count", 0),
})
except Exception as e:
logger.error(f"Error listing asset group tags: {e}")
return json.dumps({"error": f"Failed to list asset group tags: {str(e)}"})
@mcp.tool()
def create_asset_group_tag(name: str, tag: str):
"""
Create a new asset group tag.
Args:
name: Name of the asset group tag
tag: Tag value
Returns:
JSON response with created asset group tag
"""
try:
tag_obj = bloodhound_api.asset_groups.create_asset_group_tag(name, tag)
return json.dumps({
"message": f"Successfully created asset group tag: {name}",
"asset_group_tag": tag_obj.get("data", {}),
})
except Exception as e:
logger.error(f"Error creating asset group tag: {e}")
return json.dumps({"error": f"Failed to create asset group tag: {str(e)}"})
@mcp.tool()
def get_asset_group_tag_members(asset_group_tag_id: int, skip: int = 0, limit: int = 100):
"""
List members of a specific asset group tag.
Args:
asset_group_tag_id: ID of the asset group tag
skip: Number of results to skip for pagination
limit: Maximum number of results to return
Returns:
JSON response with list of members in the asset group tag
"""
try:
members = bloodhound_api.asset_groups.list_asset_group_tag_members(
asset_group_tag_id, skip=skip, limit=limit
)
return json.dumps({
"message": f"Successfully retrieved members for asset group tag ID: {asset_group_tag_id}",
"members": members.get("data", []),
"count": members.get("count", 0),
})
except Exception as e:
logger.error(f"Error retrieving asset group tag members: {e}")
return json.dumps({"error": f"Failed to retrieve asset group tag members: {str(e)}"})
# main function to start the server
async def main():
"""Main function to start the server"""
# Test connection to Bloodhound API
try:
version_info = bloodhound_api.test_connection()
if version_info:
logger.info(
f"Successfully connected to Bloodhound API. Version: {version_info}"
)
else:
logger.error("Failed to connect to Bloodhound API")
except Exception as e:
logger.error(f"Error connecting to Bloodhound API: {e}")
# Run the MCP server
await mcp.run_stdio_async()
if __name__ == "__main__":
import asyncio
parser = argparse.ArgumentParser(description="Bloodhound CE MCP Server")
args = parser.parse_args()
asyncio.run(main())