snyk_scan_project
Scan project dependencies for security vulnerabilities to identify and address potential risks in your codebase.
Instructions
Scan entire project dependencies using Snyk.
Args:
project_path: Path to the project directory (default: current directory)
Returns:
Comprehensive security report for all project dependencies
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| project_path | No | . |
Implementation Reference
- MCP tool handler for snyk_scan_project: scans project dependencies using Snyk API by parsing manifest files and calling Snyk's scan_project_manifest.async def snyk_scan_project(project_path: str = "."): """ Scan entire project dependencies using Snyk. Args: project_path: Path to the project directory (default: current directory) Returns: Comprehensive security report for all project dependencies """ from .snyk_integration import snyk_integration from .project_scanner import find_and_parse_dependencies try: # Find project dependencies dep_result = find_and_parse_dependencies(project_path) if not dep_result: return { "error": "No supported dependency files found", "supported_files": [ "pyproject.toml", "requirements.txt", "package.json", ], "project_path": project_path, } filename, ecosystem, dependencies = dep_result manifest_path = os.path.join(project_path, filename) # Test Snyk connection connection_test = await snyk_integration.test_connection() if connection_test["status"] != "connected": return { "error": "Snyk integration not configured", "details": connection_test.get("error", "Unknown error"), } # Scan the project manifest scan_result = await snyk_integration.scan_project_manifest( manifest_path, ecosystem ) if "error" in scan_result: return scan_result # Enhance with additional analysis high_priority_vulns = [ vuln for vuln in scan_result["vulnerabilities"] if vuln.get("severity") in ["critical", "high"] ] return { "project_path": project_path, "manifest_file": filename, "ecosystem": ecosystem, "scan_timestamp": scan_result["scan_timestamp"], "summary": { **scan_result["summary"], "high_priority_vulnerabilities": len(high_priority_vulns), "security_score": max( 0, 100 - ( len( [ v for v in scan_result["vulnerabilities"] if v.get("severity") == "critical" ] ) * 25 + len( [ v for v in scan_result["vulnerabilities"] if v.get("severity") == "high" ] ) * 15 + len( [ v for v in scan_result["vulnerabilities"] if v.get("severity") == "medium" ] ) * 5 + len( [ v for v in scan_result["vulnerabilities"] if v.get("severity") == "low" ] ) * 1 ), ), }, "high_priority_vulnerabilities": high_priority_vulns[:10], "license_issues": scan_result["license_issues"], "remediation_summary": { "patches_available": len( [v for v in scan_result["vulnerabilities"] if v.get("is_patchable")] ), "upgrades_available": len( [v for v in scan_result["vulnerabilities"] if v.get("upgrade_path")] ), "total_fixable": len( [ v for v in scan_result["vulnerabilities"] if v.get("is_patchable") or v.get("upgrade_path") ] ), }, "next_steps": [ "π¨ Address all critical vulnerabilities immediately", "π¦ Update packages with available security patches", "π Review medium and low priority issues", "βοΈ Check license compliance for flagged packages", "π Set up continuous monitoring with Snyk", ], } except Exception as e: return {"error": f"Project scan failed: {str(e)}", "project_path": project_path}
- Core helper function scan_project_manifest in SnykIntegration class that sends project manifest to Snyk API for scanning.async def scan_project_manifest( self, manifest_path: str, ecosystem: str = None ) -> Dict[str, Any]: """Scan project manifest file (requirements.txt, package.json, etc.)""" if not os.path.exists(manifest_path): return {"error": f"Manifest file not found: {manifest_path}"} # Auto-detect ecosystem if not provided if not ecosystem: if manifest_path.endswith(("requirements.txt", "pyproject.toml")): ecosystem = "pip" elif manifest_path.endswith("package.json"): ecosystem = "npm" else: ecosystem = "pip" # Default try: with open(manifest_path, "r", encoding="utf-8") as f: file_contents = f.read() async with httpx.AsyncClient(timeout=self.timeout) as client: test_payload = { "encoding": "plain", "files": { os.path.basename(manifest_path): {"contents": file_contents} }, } response = await client.post( f"{self.base_url}/v1/test/{ecosystem}", headers=self._get_headers(), json=test_payload, ) if response.status_code == 200: data = response.json() return self._parse_project_scan_result(data, manifest_path) else: return { "error": f"Snyk API error: {response.status_code}", "details": response.text, } except Exception as e: return {"error": f"Failed to scan manifest: {str(e)}"}
- Helper function to find and parse dependency files (pyproject.toml, requirements.txt, package.json) in the project directory.def find_and_parse_dependencies( directory: str, ) -> Optional[Tuple[str, str, Dict[str, str]]]: """ Finds and parses the most relevant dependency file in a directory. Returns: A tuple of (file_path, ecosystem, dependencies_dict) or None. """ supported_files = { "pyproject.toml": ("PyPI", parse_pyproject_toml), "requirements.txt": ("PyPI", parse_requirements_txt), "package.json": ("npm", parse_package_json), } for filename, (ecosystem, parser_func) in supported_files.items(): file_path = os.path.join(directory, filename) if os.path.exists(file_path): try: with open(file_path, "r", encoding="utf-8") as f: content = f.read() dependencies = parser_func(content) return filename, ecosystem, dependencies except Exception as e: print(f"β οΈ Error parsing {filename}: {e}", file=sys.stderr) # Continue to the next file type if parsing fails continue return None
- SnykIntegration class providing the core Snyk API client with authentication, caching, and connection testing.class SnykIntegration: """Snyk API integration for enterprise security scanning""" def __init__(self): self.api_key = os.getenv("SNYK_API_KEY") self.org_id = os.getenv("SNYK_ORG_ID") self.base_url = "https://api.snyk.io" self.rest_api_url = "https://api.snyk.io/rest" self.timeout = httpx.Timeout(60.0) # Cache for API responses self.cache = {} self.cache_ttl = timedelta(hours=6) def _get_headers(self) -> Dict[str, str]: