vulnerability_scanner.py•20.2 kB
"""
Vulnerability scanner for documentation-search-enhanced MCP server.
Integrates with OSINT sources to check library security vulnerabilities.
"""
import asyncio
import httpx
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
from enum import Enum
class SeverityLevel(Enum):
"""Vulnerability severity levels"""
CRITICAL = "critical"
HIGH = "high"
MEDIUM = "medium"
LOW = "low"
INFO = "info"
@dataclass
class Vulnerability:
"""Represents a security vulnerability"""
id: str
title: str
description: str
severity: SeverityLevel
cvss_score: Optional[float]
cve_id: Optional[str]
affected_versions: List[str]
fixed_version: Optional[str]
published_date: str
source: str # "osv", "github", "safety", "snyk"
references: List[str]
def to_dict(self) -> Dict[str, Any]:
return {
"id": self.id,
"title": self.title,
"description": (
self.description[:200] + "..."
if len(self.description) > 200
else self.description
),
"severity": self.severity.value,
"cvss_score": self.cvss_score,
"cve_id": self.cve_id,
"affected_versions": self.affected_versions,
"fixed_version": self.fixed_version,
"published_date": self.published_date,
"source": self.source,
"references": self.references[:3], # Limit references
}
@dataclass
class SecurityReport:
"""Comprehensive security report for a library"""
library_name: str
ecosystem: str # "pypi", "npm", "maven", etc.
scan_date: str
total_vulnerabilities: int
critical_count: int
high_count: int
medium_count: int
low_count: int
security_score: float # 0-100, higher is better
recommendations: List[str]
vulnerabilities: List[Vulnerability]
latest_secure_version: Optional[str]
def to_dict(self) -> Dict[str, Any]:
return {
"library_name": self.library_name,
"ecosystem": self.ecosystem,
"scan_date": self.scan_date,
"summary": {
"total_vulnerabilities": self.total_vulnerabilities,
"critical": self.critical_count,
"high": self.high_count,
"medium": self.medium_count,
"low": self.low_count,
"security_score": self.security_score,
},
"latest_secure_version": self.latest_secure_version,
"recommendations": self.recommendations,
"vulnerabilities": [vuln.to_dict() for vuln in self.vulnerabilities],
}
class VulnerabilityScanner:
"""Main vulnerability scanner class"""
def __init__(self):
self.cache = {}
self.cache_ttl = timedelta(hours=6) # Cache for 6 hours
self.timeout = httpx.Timeout(30.0)
# API endpoints
self.osv_api = "https://api.osv.dev"
self.github_api = "https://api.github.com"
self.cve_api = "https://cve.circl.lu/api"
async def scan_library(
self, library_name: str, ecosystem: str = "PyPI"
) -> SecurityReport:
"""
Comprehensive vulnerability scan for a library
Args:
library_name: Name of the library (e.g., "fastapi", "react")
ecosystem: Package ecosystem ("PyPI", "npm", "Maven", etc.)
Returns:
SecurityReport with vulnerability details
"""
cache_key = f"{library_name}_{ecosystem}"
# Check cache first
if self._is_cached(cache_key):
return self.cache[cache_key]["data"]
vulnerabilities = []
# Scan multiple sources in parallel
scan_tasks = [
self._scan_osv(library_name, ecosystem),
self._scan_github_advisories(library_name, ecosystem),
(
self._scan_safety_db(library_name)
if ecosystem.lower() == "pypi"
else self._empty_scan()
),
]
try:
results = await asyncio.gather(*scan_tasks, return_exceptions=True)
for result in results:
if isinstance(result, list):
vulnerabilities.extend(result)
elif isinstance(result, Exception):
print(f"Scan error: {result}")
except Exception as e:
print(f"Vulnerability scan failed for {library_name}: {e}")
# Generate security report
report = self._generate_security_report(
library_name, ecosystem, vulnerabilities
)
# Cache the result
self._cache_result(cache_key, report)
return report
async def _scan_osv(self, library_name: str, ecosystem: str) -> List[Vulnerability]:
"""Scan OSV (Open Source Vulnerabilities) database"""
vulnerabilities = []
try:
async with httpx.AsyncClient(timeout=self.timeout) as client:
# OSV API query
query_data = {"package": {"name": library_name, "ecosystem": ecosystem}}
response = await client.post(
f"{self.osv_api}/v1/query", json=query_data
)
if response.status_code == 200:
data = response.json()
for vuln_data in data.get("vulns", []):
vulnerability = self._parse_osv_vulnerability(vuln_data)
if vulnerability:
vulnerabilities.append(vulnerability)
except Exception as e:
print(f"OSV scan error for {library_name}: {e}")
return vulnerabilities
async def _scan_github_advisories(
self, library_name: str, ecosystem: str
) -> List[Vulnerability]:
"""Scan GitHub Security Advisories"""
vulnerabilities = []
try:
async with httpx.AsyncClient(timeout=self.timeout) as client:
# GitHub GraphQL API would be more comprehensive, but REST API is simpler
search_query = f"type:security-advisories {library_name}"
response = await client.get(
f"{self.github_api}/search/repositories",
params={"q": search_query, "per_page": 10},
headers={"Accept": "application/vnd.github+json"},
)
if response.status_code == 200:
data = response.json()
# This is a simplified implementation
# In production, you'd use GitHub's Security Advisory API
for item in data.get("items", []):
if library_name.lower() in item.get("full_name", "").lower():
vuln = Vulnerability(
id=f"GHSA-{item['id']}",
title=f"GitHub Advisory for {library_name}",
description=item.get(
"description", "Security advisory found"
),
severity=SeverityLevel.MEDIUM, # Default severity
cvss_score=None,
cve_id=None,
affected_versions=["unknown"],
fixed_version=None,
published_date=item.get("created_at", ""),
source="github",
references=[item.get("html_url", "")],
)
vulnerabilities.append(vuln)
except Exception as e:
print(f"GitHub Advisory scan error for {library_name}: {e}")
return vulnerabilities
async def _scan_safety_db(self, library_name: str) -> List[Vulnerability]:
"""Scan Python Safety Database (for PyPI packages)"""
vulnerabilities = []
try:
# Using Safety CLI database approach
# In a real implementation, you might use their API or local database
async with httpx.AsyncClient(timeout=self.timeout) as client:
# PyPA Safety Database (simplified example)
response = await client.get(
f"https://pypi.org/pypi/{library_name}/json"
)
if response.status_code == 200:
data = response.json()
# Check for known vulnerable versions
# This is a placeholder - real implementation would check Safety DB
info = data.get("info", {})
if "security" in info.get("description", "").lower():
vuln = Vulnerability(
id=f"PYSA-{library_name}",
title=f"Potential security issue in {library_name}",
description="Security-related keywords found in package description",
severity=SeverityLevel.INFO,
cvss_score=None,
cve_id=None,
affected_versions=["unknown"],
fixed_version=None,
published_date=datetime.now().isoformat(),
source="safety",
references=[f"https://pypi.org/project/{library_name}/"],
)
vulnerabilities.append(vuln)
except Exception as e:
print(f"Safety DB scan error for {library_name}: {e}")
return vulnerabilities
async def _empty_scan(self) -> List[Vulnerability]:
"""Empty scan for unsupported ecosystems"""
return []
def _parse_osv_vulnerability(
self, vuln_data: Dict[str, Any]
) -> Optional[Vulnerability]:
"""Parse OSV vulnerability data"""
try:
# Extract severity
severity = SeverityLevel.MEDIUM # Default
cvss_score = None
if "severity" in vuln_data:
severity_info = vuln_data["severity"]
if isinstance(severity_info, list) and severity_info:
severity_data = severity_info[0]
score = severity_data.get("score")
if score:
cvss_score = float(score)
if cvss_score >= 9.0:
severity = SeverityLevel.CRITICAL
elif cvss_score >= 7.0:
severity = SeverityLevel.HIGH
elif cvss_score >= 4.0:
severity = SeverityLevel.MEDIUM
else:
severity = SeverityLevel.LOW
# Extract affected versions
affected_versions = []
for affected in vuln_data.get("affected", []):
ranges = affected.get("ranges", [])
for range_info in ranges:
events = range_info.get("events", [])
for event in events:
if "introduced" in event:
affected_versions.append(f">={event['introduced']}")
elif "fixed" in event:
affected_versions.append(f"<{event['fixed']}")
# Extract references
references = []
for ref in vuln_data.get("references", []):
if "url" in ref:
references.append(ref["url"])
return Vulnerability(
id=vuln_data.get("id", ""),
title=vuln_data.get("summary", ""),
description=vuln_data.get("details", ""),
severity=severity,
cvss_score=cvss_score,
cve_id=self._extract_cve_id(vuln_data),
affected_versions=affected_versions,
fixed_version=self._extract_fixed_version(vuln_data),
published_date=vuln_data.get("published", ""),
source="osv",
references=references,
)
except Exception as e:
print(f"Error parsing OSV vulnerability: {e}")
return None
def _extract_cve_id(self, vuln_data: Dict[str, Any]) -> Optional[str]:
"""Extract CVE ID from vulnerability data"""
aliases = vuln_data.get("aliases", [])
for alias in aliases:
if alias.startswith("CVE-"):
return alias
return None
def _extract_fixed_version(self, vuln_data: Dict[str, Any]) -> Optional[str]:
"""Extract fixed version from vulnerability data"""
for affected in vuln_data.get("affected", []):
ranges = affected.get("ranges", [])
for range_info in ranges:
events = range_info.get("events", [])
for event in events:
if "fixed" in event:
return event["fixed"]
return None
def _generate_security_report(
self, library_name: str, ecosystem: str, vulnerabilities: List[Vulnerability]
) -> SecurityReport:
"""Generate comprehensive security report"""
# Count vulnerabilities by severity
critical_count = sum(
1 for v in vulnerabilities if v.severity == SeverityLevel.CRITICAL
)
high_count = sum(1 for v in vulnerabilities if v.severity == SeverityLevel.HIGH)
medium_count = sum(
1 for v in vulnerabilities if v.severity == SeverityLevel.MEDIUM
)
low_count = sum(1 for v in vulnerabilities if v.severity == SeverityLevel.LOW)
# Calculate security score (0-100, higher is better)
security_score = self._calculate_security_score(
critical_count, high_count, medium_count, low_count
)
# Generate recommendations
recommendations = self._generate_recommendations(
library_name, vulnerabilities, security_score
)
# Find latest secure version (placeholder)
latest_secure_version = self._find_latest_secure_version(vulnerabilities)
return SecurityReport(
library_name=library_name,
ecosystem=ecosystem,
scan_date=datetime.now().isoformat(),
total_vulnerabilities=len(vulnerabilities),
critical_count=critical_count,
high_count=high_count,
medium_count=medium_count,
low_count=low_count,
security_score=security_score,
recommendations=recommendations,
vulnerabilities=vulnerabilities[:10], # Limit to top 10
latest_secure_version=latest_secure_version,
)
def _calculate_security_score(
self, critical: int, high: int, medium: int, low: int
) -> float:
"""Calculate security score based on vulnerability counts"""
# Start with perfect score
score = 100.0
# Deduct points based on severity
score -= critical * 25 # Critical: -25 points each
score -= high * 15 # High: -15 points each
score -= medium * 5 # Medium: -5 points each
score -= low * 1 # Low: -1 point each
# Ensure score doesn't go below 0
return max(0.0, score)
def _generate_recommendations(
self,
library_name: str,
vulnerabilities: List[Vulnerability],
security_score: float,
) -> List[str]:
"""Generate security recommendations"""
recommendations = []
if security_score < 50:
recommendations.append(
"🚨 High security risk - Consider alternative libraries"
)
elif security_score < 70:
recommendations.append("⚠️ Moderate security risk - Monitor for updates")
elif security_score < 90:
recommendations.append("✅ Generally secure - Keep updated")
else:
recommendations.append("🛡️ Excellent security record")
# Specific recommendations based on vulnerabilities
critical_vulns = [
v for v in vulnerabilities if v.severity == SeverityLevel.CRITICAL
]
if critical_vulns:
recommendations.append(
"🔥 Update immediately - Critical vulnerabilities found"
)
fixed_versions = [v.fixed_version for v in vulnerabilities if v.fixed_version]
if fixed_versions:
latest_fix = max(fixed_versions)
recommendations.append(f"📦 Update to version {latest_fix} or later")
if len(vulnerabilities) > 5:
recommendations.append(
"📊 Many vulnerabilities found - Consider security audit"
)
return recommendations[:5] # Limit recommendations
def _find_latest_secure_version(
self, vulnerabilities: List[Vulnerability]
) -> Optional[str]:
"""Find the latest secure version"""
fixed_versions = [v.fixed_version for v in vulnerabilities if v.fixed_version]
if fixed_versions:
# This is simplified - real implementation would use proper version comparison
return max(fixed_versions)
return None
def _is_cached(self, cache_key: str) -> bool:
"""Check if result is cached and still valid"""
if cache_key not in self.cache:
return False
cached_time = self.cache[cache_key]["timestamp"]
return datetime.now() - cached_time < self.cache_ttl
def _cache_result(self, cache_key: str, result: SecurityReport) -> None:
"""Cache scan result"""
self.cache[cache_key] = {"data": result, "timestamp": datetime.now()}
# Simple cache cleanup - remove old entries
if len(self.cache) > 100:
oldest_key = min(
self.cache.keys(), key=lambda k: self.cache[k]["timestamp"]
)
del self.cache[oldest_key]
class SecurityIntegration:
"""Integration layer for security features"""
def __init__(self, scanner: VulnerabilityScanner):
self.scanner = scanner
async def get_security_score(
self, library_name: str, ecosystem: str = "PyPI"
) -> float:
"""Get security score for a library (0-100, higher is better)"""
try:
report = await self.scanner.scan_library(library_name, ecosystem)
return report.security_score
except Exception:
return 50.0 # Default neutral score
async def is_library_secure(
self, library_name: str, ecosystem: str = "PyPI", threshold: float = 70.0
) -> bool:
"""Check if library meets security threshold"""
score = await self.get_security_score(library_name, ecosystem)
return score >= threshold
async def get_security_summary(
self, library_name: str, ecosystem: str = "PyPI"
) -> Dict[str, Any]:
"""Get concise security summary"""
try:
report = await self.scanner.scan_library(library_name, ecosystem)
return {
"library": library_name,
"security_score": report.security_score,
"total_vulnerabilities": report.total_vulnerabilities,
"critical_vulnerabilities": report.critical_count,
"status": "secure" if report.security_score >= 70 else "at_risk",
"primary_recommendation": (
report.recommendations[0]
if report.recommendations
else "No specific recommendations"
),
}
except Exception as e:
return {
"library": library_name,
"security_score": 50.0,
"error": str(e),
"status": "unknown",
}
# Global instances
vulnerability_scanner = VulnerabilityScanner()
security_integration = SecurityIntegration(vulnerability_scanner)