Skip to main content
Glama
bloodhound.py7.17 kB
from typing import Dict, Optional, Any from pathlib import Path from datetime import datetime, timezone import json import requests import hmac import hashlib import base64 from pentestmcp import config """bloodhound api calls automation in prog.....""" config.load_variables() BHE_SCHEME = "http" PRINT_PRINCIPALS = False PRINT_ATTACK_PATH_TIMELINE_DATA = False PRINT_POSTURE_DATA = False DATA_START = "1970-01-01T00:00:00.000Z" DATA_END = datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z' # Now class Credentials(object): def __init__(self, token_id: str, token_key: str) -> None: self.token_id = token_id self.token_key = token_key class Domain(object): def __init__(self, name: str, id: str, collected: bool, domain_type: str) -> None: self.name = name self.id = id self.type = domain_type self.collected = collected class APIVersion(object): def __init__(self, api_version: str, server_version: str) -> None: self.api_version = api_version self.server_version = server_version class Client(object): def __init__(self, scheme: str, host: str, port: int, credentials: Credentials) -> None: self._scheme = scheme self._host = host self._port = port self._credentials = credentials def _format_url(self, uri: str) -> str: formatted_uri = uri if uri.startswith("/"): formatted_uri = formatted_uri[1:] return f"{self._scheme}://{self._host}:{self._port}/{formatted_uri}" def _request( self, method: str, uri: str, body: Optional[bytes] = None, content_type: str =None , timeout: Optional[float] = None, ) -> requests.Response: """ Signed request used by BloodHound CE. - `content_type` lets us upload application/zip or application/json. - Signature header is base64 text (not bytes). """ # start chain with token_key as HMAC key digester = hmac.new(self._credentials.token_key.encode("utf-8"), None, hashlib.sha256) # OperationKey: method + uri (no delimiter) digester.update(f"{method}{uri}".encode("utf-8")) # chain digester = hmac.new(digester.digest(), None, hashlib.sha256) # RequestDate (RFC3339-like). Use UTC and include 'Z' to match expectations. datetime_formatted = datetime.now(timezone.utc).isoformat(timespec="seconds").replace("+00:00", "Z") # docs say chain uses hours only (first 13 chars: YYYY-MM-DDTHH) digester.update(datetime_formatted[:13].encode("utf-8")) # chain again digester = hmac.new(digester.digest(), None, hashlib.sha256) # include body bytes in signature if present if body is not None: digester.update(body) # signature as BASE64 string (important: decode to ascii) signature_b64 = base64.b64encode(digester.digest()).decode("ascii") if content_type != None: headers: Dict[str, str] = { "User-Agent": "bhe-python-sdk 0001", "Authorization": f"bhesignature {self._credentials.token_id}", "RequestDate": datetime_formatted, "Signature": signature_b64, "Content-Type": content_type, } else: headers: Dict[str, str] = { "User-Agent": "bhe-python-sdk 0001", "Authorization": f"bhesignature {self._credentials.token_id}", "RequestDate": datetime_formatted, "Signature": signature_b64, } # perform request (use data=body so we pass raw bytes as-is) resp = requests.request(method=method, url=self._format_url(uri), headers=headers, data=body, timeout=timeout) resp.raise_for_status() # raise on HTTP error (optional; useful for automation) return resp def get_version(self) -> APIVersion: response = self._request("GET", "/api/version") payload = response.json() return APIVersion(api_version=payload["data"]["API"]["current_version"], server_version=payload["data"]["server_version"]) def connectToApi(): # This might be best loaded from a file credentials = Credentials( token_id=config.BHE_TOKEN_ID, token_key=config.BHE_TOKEN_KEY, ) client = Client(scheme=BHE_SCHEME, host=config.BHE_DOMAIN, port=config.BHE_PORT, credentials=credentials) return client.get_version().api_version def upload_zip(zip_path: str) -> Dict[str, Any]: """ Upload a SharpHound zip (or BloodHound JSON zip) using the BHCE file-upload job flow: 1) POST /api/v2/file-upload/start -> get upload id 2) POST /api/v2/file-upload/{id} -> upload bytes (Content-Type: application/zip) 3) POST /api/v2/file-upload/{id}/end -> finish job Returns the final job response JSON (or raises on error). """ # create client credentials = Credentials(token_id=config.BHE_TOKEN_ID, token_key=config.BHE_TOKEN_KEY) client = Client(scheme=BHE_SCHEME, host=config.BHE_DOMAIN, port=config.BHE_PORT, credentials=credentials) p = Path(zip_path) if not p.exists(): raise FileNotFoundError(f"{zip_path} not found") zip_bytes = p.read_bytes() # 1) create upload job resp = client._request("POST", "/api/v2/file-upload/start", content_type="application/json") # response body should contain the upload job id in JSON: { "data": { "id": "<id>" }, ... } start_payload = resp.json() upload_id = start_payload.get("data", {}).get("id") if not upload_id: raise RuntimeError(f"Could not create upload job: {start_payload}") # 2) upload the zip bytes (include body bytes when signing) upload_uri = f"/api/v2/file-upload/{upload_id}" client._request("POST", upload_uri, body=zip_bytes, content_type="application/zip") # 3) finish the job end_uri = f"/api/v2/file-upload/{upload_id}/end" end_resp = client._request("POST", end_uri, body=None, content_type="application/json") # API returns 204 No Content on success; depending on your client .json() may fail. if end_resp.status_code == 204: return {"status": "accepted", "upload_id": upload_id} else: return end_resp def list_saved_queries(): credentials = Credentials( token_id=config.BHE_TOKEN_ID, token_key=config.BHE_TOKEN_KEY, ) client = Client(scheme=BHE_SCHEME, host=config.BHE_DOMAIN, port=config.BHE_PORT, credentials=credentials) response=client._request("GET","/api/v2/saved-queries") return response.json() def run_cypher_query(query): credentials = Credentials( token_id=config.BHE_TOKEN_ID, token_key=config.BHE_TOKEN_KEY, ) client = Client(scheme=BHE_SCHEME, host=config.BHE_DOMAIN, port=config.BHE_PORT, credentials=credentials) payload={"query":query} data=json.dumps(payload).encode('utf-8') response=client._request("POST","/api/v2/graphs/cypher",body=data,content_type="application/json") return response.json() """NEO4J implementation coming next """

Implementation Reference

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/YoussefSahnoun/PentestMCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server