from typing import Dict, Optional, Any
from pathlib import Path
from datetime import datetime, timezone
import json
import requests
import hmac
import hashlib
import base64
from pentestmcp import config
"""bloodhound api calls automation in prog....."""
config.load_variables()
BHE_SCHEME = "http"
PRINT_PRINCIPALS = False
PRINT_ATTACK_PATH_TIMELINE_DATA = False
PRINT_POSTURE_DATA = False
DATA_START = "1970-01-01T00:00:00.000Z"
DATA_END = datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z' # Now
class Credentials(object):
def __init__(self, token_id: str, token_key: str) -> None:
self.token_id = token_id
self.token_key = token_key
class Domain(object):
def __init__(self, name: str, id: str, collected: bool, domain_type: str) -> None:
self.name = name
self.id = id
self.type = domain_type
self.collected = collected
class APIVersion(object):
def __init__(self, api_version: str, server_version: str) -> None:
self.api_version = api_version
self.server_version = server_version
class Client(object):
def __init__(self, scheme: str, host: str, port: int, credentials: Credentials) -> None:
self._scheme = scheme
self._host = host
self._port = port
self._credentials = credentials
def _format_url(self, uri: str) -> str:
formatted_uri = uri
if uri.startswith("/"):
formatted_uri = formatted_uri[1:]
return f"{self._scheme}://{self._host}:{self._port}/{formatted_uri}"
def _request(
self,
method: str,
uri: str,
body: Optional[bytes] = None,
content_type: str =None ,
timeout: Optional[float] = None,
) -> requests.Response:
"""
Signed request used by BloodHound CE.
- `content_type` lets us upload application/zip or application/json.
- Signature header is base64 text (not bytes).
"""
# start chain with token_key as HMAC key
digester = hmac.new(self._credentials.token_key.encode("utf-8"), None, hashlib.sha256)
# OperationKey: method + uri (no delimiter)
digester.update(f"{method}{uri}".encode("utf-8"))
# chain
digester = hmac.new(digester.digest(), None, hashlib.sha256)
# RequestDate (RFC3339-like). Use UTC and include 'Z' to match expectations.
datetime_formatted = datetime.now(timezone.utc).isoformat(timespec="seconds").replace("+00:00", "Z")
# docs say chain uses hours only (first 13 chars: YYYY-MM-DDTHH)
digester.update(datetime_formatted[:13].encode("utf-8"))
# chain again
digester = hmac.new(digester.digest(), None, hashlib.sha256)
# include body bytes in signature if present
if body is not None:
digester.update(body)
# signature as BASE64 string (important: decode to ascii)
signature_b64 = base64.b64encode(digester.digest()).decode("ascii")
if content_type != None:
headers: Dict[str, str] = {
"User-Agent": "bhe-python-sdk 0001",
"Authorization": f"bhesignature {self._credentials.token_id}",
"RequestDate": datetime_formatted,
"Signature": signature_b64,
"Content-Type": content_type,
}
else:
headers: Dict[str, str] = {
"User-Agent": "bhe-python-sdk 0001",
"Authorization": f"bhesignature {self._credentials.token_id}",
"RequestDate": datetime_formatted,
"Signature": signature_b64,
}
# perform request (use data=body so we pass raw bytes as-is)
resp = requests.request(method=method, url=self._format_url(uri), headers=headers, data=body, timeout=timeout)
resp.raise_for_status() # raise on HTTP error (optional; useful for automation)
return resp
def get_version(self) -> APIVersion:
response = self._request("GET", "/api/version")
payload = response.json()
return APIVersion(api_version=payload["data"]["API"]["current_version"], server_version=payload["data"]["server_version"])
def connectToApi():
# This might be best loaded from a file
credentials = Credentials(
token_id=config.BHE_TOKEN_ID,
token_key=config.BHE_TOKEN_KEY,
)
client = Client(scheme=BHE_SCHEME, host=config.BHE_DOMAIN, port=config.BHE_PORT, credentials=credentials)
return client.get_version().api_version
def upload_zip(zip_path: str) -> Dict[str, Any]:
"""
Upload a SharpHound zip (or BloodHound JSON zip) using the BHCE file-upload job flow:
1) POST /api/v2/file-upload/start -> get upload id
2) POST /api/v2/file-upload/{id} -> upload bytes (Content-Type: application/zip)
3) POST /api/v2/file-upload/{id}/end -> finish job
Returns the final job response JSON (or raises on error).
"""
# create client
credentials = Credentials(token_id=config.BHE_TOKEN_ID, token_key=config.BHE_TOKEN_KEY)
client = Client(scheme=BHE_SCHEME, host=config.BHE_DOMAIN, port=config.BHE_PORT, credentials=credentials)
p = Path(zip_path)
if not p.exists():
raise FileNotFoundError(f"{zip_path} not found")
zip_bytes = p.read_bytes()
# 1) create upload job
resp = client._request("POST", "/api/v2/file-upload/start", content_type="application/json")
# response body should contain the upload job id in JSON: { "data": { "id": "<id>" }, ... }
start_payload = resp.json()
upload_id = start_payload.get("data", {}).get("id")
if not upload_id:
raise RuntimeError(f"Could not create upload job: {start_payload}")
# 2) upload the zip bytes (include body bytes when signing)
upload_uri = f"/api/v2/file-upload/{upload_id}"
client._request("POST", upload_uri, body=zip_bytes, content_type="application/zip")
# 3) finish the job
end_uri = f"/api/v2/file-upload/{upload_id}/end"
end_resp = client._request("POST", end_uri, body=None, content_type="application/json")
# API returns 204 No Content on success; depending on your client .json() may fail.
if end_resp.status_code == 204:
return {"status": "accepted", "upload_id": upload_id}
else:
return end_resp
def list_saved_queries():
credentials = Credentials(
token_id=config.BHE_TOKEN_ID,
token_key=config.BHE_TOKEN_KEY,
)
client = Client(scheme=BHE_SCHEME, host=config.BHE_DOMAIN, port=config.BHE_PORT, credentials=credentials)
response=client._request("GET","/api/v2/saved-queries")
return response.json()
def run_cypher_query(query):
credentials = Credentials(
token_id=config.BHE_TOKEN_ID,
token_key=config.BHE_TOKEN_KEY,
)
client = Client(scheme=BHE_SCHEME, host=config.BHE_DOMAIN, port=config.BHE_PORT, credentials=credentials)
payload={"query":query}
data=json.dumps(payload).encode('utf-8')
response=client._request("POST","/api/v2/graphs/cypher",body=data,content_type="application/json")
return response.json()
"""NEO4J implementation coming next """