security_fetch_package_vulnerabilities
Check a software package for known security vulnerabilities by providing its name, version, and ecosystem. Returns CVE IDs, severity scores, and available patch versions.
Instructions
Use this to check whether a software package has known security vulnerabilities. Provide package name, version, and ecosystem (npm, PyPI, or Maven). Returns CVE IDs, severity scores, and available patch versions.
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| package | Yes | ||
| version | Yes | ||
| ecosystem | Yes |
Output Schema
| Name | Required | Description | Default |
|---|---|---|---|
No arguments | |||
Implementation Reference
- datanexus/tools/t10.py:97-225 (handler)Primary handler for security_fetch_package_vulnerabilities. Queries OSV.dev API for known CVEs, applies caching, circuit breaker, and response formatting.
# ══════════════════════════════════════════════════════════════════════════════ # DATA TOOL 1 — fetch_package_vulnerabilities # ══════════════════════════════════════════════════════════════════════════════ @mcp.tool() @with_timeout @verify_entitlement("T10") async def fetch_package_vulnerabilities( package: str, version: str, ecosystem: str, ) -> dict: """Use this to check whether a software package has known security vulnerabilities. Provide package name, version, and ecosystem (npm, PyPI, or Maven). Returns CVE IDs, severity scores, and available patch versions.""" pkg_clean = package.strip() ver_clean = version.strip() eco_clean = ecosystem.strip() params = {"package": pkg_clean, "version": ver_clean, "ecosystem": eco_clean} async with AuditContext("T10", params, "1.0") as ctx: phash = make_params_hash(params) # ── 1. Cache check ──────────────────────────────────────────────────── cached = get_cached("T10", phash) if cached: ctx.set_cache_hit(True) return { **cached, **standard_response_fields( ctx.query_hash, cached.get("data_as_of", ""), cached.get("ingest_healthy", True), ), "cache_hit": True, } # ── 2. Circuit breaker ──────────────────────────────────────────────── if is_tripped("osv_dev"): archive = get_cached("T10", phash + "_archive") ctx.set_error(ErrorCode.CIRCUIT_OPEN) return { "status": "error", "tool_id": "T10", "data": archive or {}, "markdown_output": _archive_markdown(archive, pkg_clean, ver_clean, eco_clean), "staleness_notice": get_staleness_notice( "osv_dev", (archive or {}).get("data_as_of", "unknown"), ), "disclaimer": T10_DISCLAIMER, "cache_hit": False, "sha256_hash": "", **standard_response_fields(ctx.query_hash, "", False), } # ── 3. Live fetch — OSV.dev ─────────────────────────────────────────── osv_ecosystem = _normalise_osv_ecosystem(eco_clean) try: async with httpx.AsyncClient( timeout=_HTTP_TIMEOUT, headers=_HTTP_HEADERS, follow_redirects=True, ) as client: osv_data = await query_osv_for_version( client, pkg_clean, ver_clean, osv_ecosystem, ) except httpx.TimeoutException: record_failure_sync("osv_dev") return error_response( error_code=ErrorCode.UPSTREAM_TIMEOUT, message="OSV.dev timed out. Try again shortly.", query_hash=ctx.query_hash, retry_after=30, ingest_healthy=False, ) except httpx.HTTPStatusError: record_failure_sync("osv_dev") return error_response( error_code=ErrorCode.UPSTREAM_UNAVAILABLE, message="OSV.dev temporarily unavailable.", query_hash=ctx.query_hash, retry_after=60, ingest_healthy=False, ) except Exception: record_failure_sync("osv_dev") log.exception("t10.fetch_package_vulnerabilities error pkg=%s", pkg_clean) return error_response( error_code=ErrorCode.INTERNAL_ERROR, message="An internal error occurred. Please try again.", query_hash=ctx.query_hash, retry_after=0, ingest_healthy=False, ) record_success_sync("osv_dev") vulns = osv_data.get("vulns", []) # Phase 0 response-formatter fixes (query-time layer — belt-and-suspenders) vulns = _fmt_dedup_pysec_ghsa(vulns) vulns = _fmt_fix_severity_levels(vulns) osv_data["vulns"] = vulns # keep osv_data consistent for result_data raw_bytes = json.dumps(osv_data).encode() phash_val = _compute_hash(raw_bytes) data_as_of = datetime.now(timezone.utc).isoformat() markdown = _build_vuln_markdown(vulns, pkg_clean, ver_clean, eco_clean) result_data = { "status": "ok", "tool_id": "T10", "source_url": "https://api.osv.dev/v1/query", "fetch_timestamp": data_as_of, "cache_hit": False, "staleness_notice": None, "sha256_hash": phash_val, "data": osv_data, "markdown_output": markdown, "disclaimer": T10_DISCLAIMER, "data_as_of": data_as_of, "ingest_healthy": True, } set_cached("T10", phash, result_data, _T10_TTL) set_cached("T10", phash + "_archive", result_data, _T10_TTL * 24) ctx.set_cache_hit(False) log.info("t10.fetch_package_vulnerabilities ok pkg=%s ver=%s eco=%s vulns=%d", pkg_clean, ver_clean, eco_clean, len(vulns)) return {**result_data, **standard_response_fields(ctx.query_hash, data_as_of, True)} - datanexus/tools/security.py:1-21 (registration)MCP tool registration via FastMCP sub-server. Imports fetch_package_vulnerabilities from t10.py and registers it with the 'security' namespace.
""" DataNexus Security sub-server — T10 tools. Sprint 3 P01: mcp-tool registrations only. Tool logic lives in t10.py. """ from fastmcp import FastMCP from datanexus.tools.t10 import ( fetch_package_vulnerabilities, fetch_dependency_graph, fetch_cve_detail, audit_sbom_vulnerabilities, fetch_package_licence, ) security = FastMCP("DataNexus Security") security.tool()(fetch_package_vulnerabilities) security.tool()(fetch_dependency_graph) security.tool()(fetch_cve_detail) security.tool()(audit_sbom_vulnerabilities) security.tool()(fetch_package_licence) - datanexus/main.py:1-23 (registration)Top-level server mounts the 'security' sub-server, exposing fetch_package_vulnerabilities under the 'security' namespace.
""" DataNexus MCP — Sprint 3 P01 entry point. Spec: DataNexus_MCP_Spec_v7_5.docx (authoritative) Transport: streamable-http (CLAUDE.md rule — SSE deprecated April 2026) Server: datanexusmcp.com | Hetzner CAX11 | 178.104.251.70 Registered tools (29 total): nonprofit (3): fetch_nonprofit_by_ein, search_nonprofits_by_name, fetch_charity_uk security (5): fetch_package_vulnerabilities, fetch_dependency_graph, fetch_cve_detail, audit_sbom_vulnerabilities, fetch_package_licence compliance (4): fetch_npi_provider, search_npi_by_name, fetch_finra_broker, check_sam_exclusion domain (4): fetch_domain_rdap, fetch_ssl_certificate_chain, fetch_dns_records, fetch_domain_history legal (4): fetch_patent_by_number, search_patents_by_keyword, fetch_patent_citations, fetch_inventor_portfolio govcon (3): search_contract_awards, fetch_vendor_contract_history, fetch_open_solicitations regulatory (3): search_open_rulemakings, fetch_docket_details, fetch_federal_register_notices Shared (3): report_feedback, report_mcpize_link, validate_tool_output - datanexus/tools/meta.py:1-43 (registration)Tool name and description in the TOOL_REGISTRY used by the search meta-tool for keyword matching.
""" P02 — search_datanexus_tools meta-tool. Keyword overlap scoring against TOOL_REGISTRY task descriptions. Analytics: INCR redis key analytics:search:{YYYY-MM-DD} — no raw query text stored. """ import logging import re from datetime import date from typing import Optional from datanexus.cache import get_redis log = logging.getLogger(__name__) TOOL_REGISTRY = [ {"name": "nonprofit_fetch_nonprofit_by_ein", "task": "research a US charity or nonprofit by EIN number"}, {"name": "nonprofit_search_nonprofits_by_name", "task": "search for nonprofits or charities by organisation name"}, {"name": "nonprofit_fetch_charity_uk", "task": "look up a UK registered charity by number or name"}, {"name": "security_fetch_package_vulnerabilities", "task": "check a software package for known CVEs and security vulnerabilities"}, {"name": "security_fetch_dependency_graph", "task": "get the full dependency tree for a software package"}, {"name": "security_fetch_cve_detail", "task": "get full detail on a specific CVE vulnerability by ID"}, {"name": "security_audit_sbom_vulnerabilities", "task": "audit a software bill of materials for known vulnerabilities"}, {"name": "security_fetch_package_licence", "task": "check the open source licence for a package version"}, {"name": "compliance_fetch_npi_provider", "task": "verify a US healthcare provider by NPI number"}, {"name": "compliance_search_npi_by_name", "task": "search for a healthcare provider by name and state"}, {"name": "compliance_fetch_finra_broker", "task": "verify a financial broker or advisor registration with FINRA"}, {"name": "compliance_check_sam_exclusion", "task": "check whether a person or company is excluded from federal contracting"}, {"name": "domain_fetch_domain_rdap", "task": "look up domain registration and ownership details"}, {"name": "domain_fetch_ssl_certificate_chain", "task": "inspect the SSL certificate chain for a domain"}, {"name": "domain_fetch_dns_records", "task": "get DNS records for a domain"}, {"name": "domain_fetch_domain_history", "task": "get historical SSL certificate records for a domain"}, {"name": "legal_fetch_patent_by_number", "task": "look up a specific patent by number across US EP or WO"}, {"name": "legal_search_patents_by_keyword", "task": "search for patents by keyword to find prior art"}, {"name": "legal_fetch_patent_citations", "task": "get forward and backward citation chains for a patent"}, {"name": "legal_fetch_inventor_portfolio", "task": "get all patents filed by a specific inventor or assignee"}, {"name": "govcon_search_contract_awards", "task": "search government contract awards by keyword or agency"}, {"name": "govcon_fetch_vendor_contract_history", "task": "get the full government contract history for a specific vendor"}, {"name": "govcon_fetch_open_solicitations", "task": "find currently open government procurement opportunities"}, {"name": "regulatory_search_open_rulemakings", "task": "find open regulatory rulemakings and comment periods"}, {"name": "regulatory_fetch_docket_details", "task": "get full details for a specific regulatory docket by ID"}, {"name": "regulatory_fetch_federal_register_notices","task": "fetch recent Federal Register notices for an agency"}, ] - datanexus/core/prewarm.py:54-72 (helper)Pre-warm helper that seeds lodash:4.17.21:npm into Redis cache at startup.
async def _fetch_t10_package_vulnerabilities(raw_param: str) -> dict: from datanexus.tools.t10 import fetch_package_vulnerabilities parts = raw_param.split(":", 2) if len(parts) != 3: raise ValueError(f"Invalid package param: {raw_param!r} — expected pkg:ver:ecosystem") return await fetch_package_vulnerabilities(parts[0], parts[1], parts[2]) async def _fetch_t10_cve_detail(cve_id: str) -> dict: from datanexus.tools.t10 import fetch_cve_detail return await fetch_cve_detail(cve_id) async def _fetch_t10_package_licence(raw_param: str) -> dict: from datanexus.tools.t10 import fetch_package_licence parts = raw_param.split(":", 2) if len(parts) != 3: raise ValueError(f"Invalid package param: {raw_param!r} — expected pkg:ver:ecosystem") return await fetch_package_licence(parts[0], parts[1], parts[2])