security_audit_sbom_vulnerabilities
Audit a software bill of materials (SBOM) for known vulnerabilities by submitting a CycloneDX or SPDX JSON. Returns all CVEs found across every listed component.
Instructions
Use this to audit a software bill of materials for known vulnerabilities. Provide a CycloneDX or SPDX SBOM as a JSON string. Returns all CVEs found across every listed component.
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| sbom_json | Yes |
Output Schema
| Name | Required | Description | Default |
|---|---|---|---|
No arguments | |||
Implementation Reference
- datanexus/tools/t10.py:535-649 (handler)Main handler for the audit_sbom_vulnerabilities tool. Accepts CycloneDX/SPDX SBOM as JSON string, extracts components, queries OSV.dev batch API, and returns vulnerability audit results.
@mcp.tool() @with_timeout @verify_entitlement("T10") async def audit_sbom_vulnerabilities(sbom_json: str) -> dict: """Use this to audit a software bill of materials for known vulnerabilities. Provide a CycloneDX or SPDX SBOM as a JSON string. Returns all CVEs found across every listed component.""" import hashlib params = {"sbom_hash": hashlib.sha256(sbom_json.encode()).hexdigest()[:32]} async with AuditContext("T10", params, "1.0") as ctx: phash = make_params_hash(params) cached = get_cached("T10", phash) if cached: ctx.set_cache_hit(True) return { **cached, **standard_response_fields( ctx.query_hash, cached.get("data_as_of", ""), cached.get("ingest_healthy", True), ), "cache_hit": True, } try: sbom_data = json.loads(sbom_json) except json.JSONDecodeError: return error_response( error_code=ErrorCode.VALIDATION_ERROR, message="Invalid JSON in sbom_json. Provide valid CycloneDX or SPDX JSON.", query_hash=ctx.query_hash, retry_after=0, ingest_healthy=True, ) components = _extract_sbom_components(sbom_data) if not components: return error_response( error_code=ErrorCode.VALIDATION_ERROR, message=( "No parseable components found in SBOM. " "Ensure CycloneDX components[] or SPDX packages[] with PURLs are present." ), query_hash=ctx.query_hash, retry_after=0, ingest_healthy=True, ) if is_tripped("osv_dev"): ctx.set_error(ErrorCode.CIRCUIT_OPEN) return error_response( error_code=ErrorCode.CIRCUIT_OPEN, message="OSV.dev currently unavailable. Try again later.", query_hash=ctx.query_hash, retry_after=300, ingest_healthy=False, ) try: async with httpx.AsyncClient( timeout=_HTTP_TIMEOUT, headers=_HTTP_HEADERS, follow_redirects=True, ) as client: batch_results = await _batch_osv_query(client, components) except httpx.TimeoutException: record_failure_sync("osv_dev") return error_response( error_code=ErrorCode.UPSTREAM_TIMEOUT, message="OSV.dev batch query timed out.", query_hash=ctx.query_hash, retry_after=30, ingest_healthy=False, ) except Exception: record_failure_sync("osv_dev") log.exception("t10.audit_sbom_vulnerabilities error") return error_response( error_code=ErrorCode.INTERNAL_ERROR, message="An internal error occurred. Please try again.", query_hash=ctx.query_hash, retry_after=0, ingest_healthy=False, ) record_success_sync("osv_dev") audit_data = _build_audit_data(components, batch_results) raw_bytes = json.dumps(audit_data).encode() phash_val = _compute_hash(raw_bytes) data_as_of = datetime.now(timezone.utc).isoformat() markdown = _build_sbom_audit_markdown(audit_data) result_data = { "status": "ok", "tool_id": "T10", "source_url": "https://api.osv.dev/v1/querybatch", "fetch_timestamp": data_as_of, "cache_hit": False, "staleness_notice": None, "sha256_hash": phash_val, "data": audit_data, "markdown_output": markdown, "disclaimer": T10_DISCLAIMER, "data_as_of": data_as_of, "ingest_healthy": True, } set_cached("T10", phash, result_data, _T10_TTL) ctx.set_cache_hit(False) log.info("t10.audit_sbom_vulnerabilities ok components=%d vuln_count=%d", len(components), audit_data.get("total_vulns", 0)) return {**result_data, **standard_response_fields(ctx.query_hash, data_as_of, True)} - datanexus/tools/security.py:20-21 (registration)Registration of audit_sbom_vulnerabilities on the 'security' FastMCP sub-server.
security.tool()(audit_sbom_vulnerabilities) security.tool()(fetch_package_licence) - datanexus/main.py:155-160 (registration)Mounting the 'security' FastMCP sub-server (which contains audit_sbom_vulnerabilities) under the 'security' namespace.
main.mount(security, namespace="security") main.mount(compliance, namespace="compliance") main.mount(domain, namespace="domain") main.mount(legal, namespace="legal") main.mount(govcon, namespace="govcon") main.mount(regulatory, namespace="regulatory") - datanexus/tools/meta.py:23-43 (registration)Tool registry entry in search_datanexus_tools meta-tool for discovery.
{"name": "security_audit_sbom_vulnerabilities", "task": "audit a software bill of materials for known vulnerabilities"}, {"name": "security_fetch_package_licence", "task": "check the open source licence for a package version"}, {"name": "compliance_fetch_npi_provider", "task": "verify a US healthcare provider by NPI number"}, {"name": "compliance_search_npi_by_name", "task": "search for a healthcare provider by name and state"}, {"name": "compliance_fetch_finra_broker", "task": "verify a financial broker or advisor registration with FINRA"}, {"name": "compliance_check_sam_exclusion", "task": "check whether a person or company is excluded from federal contracting"}, {"name": "domain_fetch_domain_rdap", "task": "look up domain registration and ownership details"}, {"name": "domain_fetch_ssl_certificate_chain", "task": "inspect the SSL certificate chain for a domain"}, {"name": "domain_fetch_dns_records", "task": "get DNS records for a domain"}, {"name": "domain_fetch_domain_history", "task": "get historical SSL certificate records for a domain"}, {"name": "legal_fetch_patent_by_number", "task": "look up a specific patent by number across US EP or WO"}, {"name": "legal_search_patents_by_keyword", "task": "search for patents by keyword to find prior art"}, {"name": "legal_fetch_patent_citations", "task": "get forward and backward citation chains for a patent"}, {"name": "legal_fetch_inventor_portfolio", "task": "get all patents filed by a specific inventor or assignee"}, {"name": "govcon_search_contract_awards", "task": "search government contract awards by keyword or agency"}, {"name": "govcon_fetch_vendor_contract_history", "task": "get the full government contract history for a specific vendor"}, {"name": "govcon_fetch_open_solicitations", "task": "find currently open government procurement opportunities"}, {"name": "regulatory_search_open_rulemakings", "task": "find open regulatory rulemakings and comment periods"}, {"name": "regulatory_fetch_docket_details", "task": "get full details for a specific regulatory docket by ID"}, {"name": "regulatory_fetch_federal_register_notices","task": "fetch recent Federal Register notices for an agency"}, ] - datanexus/tools/t10.py:950-1007 (helper)Helper functions: _extract_sbom_components parses CycloneDX/SPDX, _parse_purl parses PURL strings, _batch_osv_query calls OSV.dev batch API, _build_audit_data merges results into audit summary.
def _extract_sbom_components(sbom: dict) -> list[dict]: """Parse CycloneDX or SPDX JSON and extract (name, version, ecosystem).""" components = [] if sbom.get("bomFormat") == "CycloneDX" or "components" in sbom: for comp in sbom.get("components", []): p = _parse_purl(comp.get("purl", "")) if p: components.append(p) elif comp.get("name"): components.append({ "name": comp["name"], "version": comp.get("version", ""), "ecosystem": "", }) return components if "spdxVersion" in sbom or "packages" in sbom: for pkg in sbom.get("packages", []): for ext in pkg.get("externalRefs", []): if ext.get("referenceType") == "purl": p = _parse_purl(ext.get("referenceLocator", "")) if p: components.append(p) break else: if pkg.get("name"): components.append({ "name": pkg["name"], "version": pkg.get("versionInfo", ""), "ecosystem": "", }) return components def _parse_purl(purl: str) -> Optional[dict]: """Parse a Package URL (PURL) string into (name, version, ecosystem).""" try: if not purl.startswith("pkg:"): return None rest = purl[4:] slash = rest.find("/") if slash < 0: return None pkg_type = rest[:slash].lower() rest = rest[slash + 1:] at = rest.rfind("@") version = "" if at >= 0: version = rest[at + 1:].split("?")[0].split("#")[0] rest = rest[:at] name = rest.split("/")[-1] eco_map = { "pypi": "PyPI", "npm": "npm", "maven": "Maven", "golang": "Go", "cargo": "crates.io", "nuget": "NuGet", "gem": "RubyGems", "composer": "Packagist", } return {"name": name, "version": version, "ecosystem": eco_map.get(pkg_type, pkg_type)} except Exception: return None