Skip to main content
Glama

Documentation Search MCP Server

vulnerability_scanner.py20.2 kB
""" Vulnerability scanner for documentation-search-enhanced MCP server. Integrates with OSINT sources to check library security vulnerabilities. """ import asyncio import httpx from datetime import datetime, timedelta from typing import Dict, List, Any, Optional from dataclasses import dataclass from enum import Enum class SeverityLevel(Enum): """Vulnerability severity levels""" CRITICAL = "critical" HIGH = "high" MEDIUM = "medium" LOW = "low" INFO = "info" @dataclass class Vulnerability: """Represents a security vulnerability""" id: str title: str description: str severity: SeverityLevel cvss_score: Optional[float] cve_id: Optional[str] affected_versions: List[str] fixed_version: Optional[str] published_date: str source: str # "osv", "github", "safety", "snyk" references: List[str] def to_dict(self) -> Dict[str, Any]: return { "id": self.id, "title": self.title, "description": ( self.description[:200] + "..." if len(self.description) > 200 else self.description ), "severity": self.severity.value, "cvss_score": self.cvss_score, "cve_id": self.cve_id, "affected_versions": self.affected_versions, "fixed_version": self.fixed_version, "published_date": self.published_date, "source": self.source, "references": self.references[:3], # Limit references } @dataclass class SecurityReport: """Comprehensive security report for a library""" library_name: str ecosystem: str # "pypi", "npm", "maven", etc. scan_date: str total_vulnerabilities: int critical_count: int high_count: int medium_count: int low_count: int security_score: float # 0-100, higher is better recommendations: List[str] vulnerabilities: List[Vulnerability] latest_secure_version: Optional[str] def to_dict(self) -> Dict[str, Any]: return { "library_name": self.library_name, "ecosystem": self.ecosystem, "scan_date": self.scan_date, "summary": { "total_vulnerabilities": self.total_vulnerabilities, "critical": self.critical_count, "high": self.high_count, "medium": self.medium_count, "low": self.low_count, "security_score": self.security_score, }, "latest_secure_version": self.latest_secure_version, "recommendations": self.recommendations, "vulnerabilities": [vuln.to_dict() for vuln in self.vulnerabilities], } class VulnerabilityScanner: """Main vulnerability scanner class""" def __init__(self): self.cache = {} self.cache_ttl = timedelta(hours=6) # Cache for 6 hours self.timeout = httpx.Timeout(30.0) # API endpoints self.osv_api = "https://api.osv.dev" self.github_api = "https://api.github.com" self.cve_api = "https://cve.circl.lu/api" async def scan_library( self, library_name: str, ecosystem: str = "PyPI" ) -> SecurityReport: """ Comprehensive vulnerability scan for a library Args: library_name: Name of the library (e.g., "fastapi", "react") ecosystem: Package ecosystem ("PyPI", "npm", "Maven", etc.) Returns: SecurityReport with vulnerability details """ cache_key = f"{library_name}_{ecosystem}" # Check cache first if self._is_cached(cache_key): return self.cache[cache_key]["data"] vulnerabilities = [] # Scan multiple sources in parallel scan_tasks = [ self._scan_osv(library_name, ecosystem), self._scan_github_advisories(library_name, ecosystem), ( self._scan_safety_db(library_name) if ecosystem.lower() == "pypi" else self._empty_scan() ), ] try: results = await asyncio.gather(*scan_tasks, return_exceptions=True) for result in results: if isinstance(result, list): vulnerabilities.extend(result) elif isinstance(result, Exception): print(f"Scan error: {result}") except Exception as e: print(f"Vulnerability scan failed for {library_name}: {e}") # Generate security report report = self._generate_security_report( library_name, ecosystem, vulnerabilities ) # Cache the result self._cache_result(cache_key, report) return report async def _scan_osv(self, library_name: str, ecosystem: str) -> List[Vulnerability]: """Scan OSV (Open Source Vulnerabilities) database""" vulnerabilities = [] try: async with httpx.AsyncClient(timeout=self.timeout) as client: # OSV API query query_data = {"package": {"name": library_name, "ecosystem": ecosystem}} response = await client.post( f"{self.osv_api}/v1/query", json=query_data ) if response.status_code == 200: data = response.json() for vuln_data in data.get("vulns", []): vulnerability = self._parse_osv_vulnerability(vuln_data) if vulnerability: vulnerabilities.append(vulnerability) except Exception as e: print(f"OSV scan error for {library_name}: {e}") return vulnerabilities async def _scan_github_advisories( self, library_name: str, ecosystem: str ) -> List[Vulnerability]: """Scan GitHub Security Advisories""" vulnerabilities = [] try: async with httpx.AsyncClient(timeout=self.timeout) as client: # GitHub GraphQL API would be more comprehensive, but REST API is simpler search_query = f"type:security-advisories {library_name}" response = await client.get( f"{self.github_api}/search/repositories", params={"q": search_query, "per_page": 10}, headers={"Accept": "application/vnd.github+json"}, ) if response.status_code == 200: data = response.json() # This is a simplified implementation # In production, you'd use GitHub's Security Advisory API for item in data.get("items", []): if library_name.lower() in item.get("full_name", "").lower(): vuln = Vulnerability( id=f"GHSA-{item['id']}", title=f"GitHub Advisory for {library_name}", description=item.get( "description", "Security advisory found" ), severity=SeverityLevel.MEDIUM, # Default severity cvss_score=None, cve_id=None, affected_versions=["unknown"], fixed_version=None, published_date=item.get("created_at", ""), source="github", references=[item.get("html_url", "")], ) vulnerabilities.append(vuln) except Exception as e: print(f"GitHub Advisory scan error for {library_name}: {e}") return vulnerabilities async def _scan_safety_db(self, library_name: str) -> List[Vulnerability]: """Scan Python Safety Database (for PyPI packages)""" vulnerabilities = [] try: # Using Safety CLI database approach # In a real implementation, you might use their API or local database async with httpx.AsyncClient(timeout=self.timeout) as client: # PyPA Safety Database (simplified example) response = await client.get( f"https://pypi.org/pypi/{library_name}/json" ) if response.status_code == 200: data = response.json() # Check for known vulnerable versions # This is a placeholder - real implementation would check Safety DB info = data.get("info", {}) if "security" in info.get("description", "").lower(): vuln = Vulnerability( id=f"PYSA-{library_name}", title=f"Potential security issue in {library_name}", description="Security-related keywords found in package description", severity=SeverityLevel.INFO, cvss_score=None, cve_id=None, affected_versions=["unknown"], fixed_version=None, published_date=datetime.now().isoformat(), source="safety", references=[f"https://pypi.org/project/{library_name}/"], ) vulnerabilities.append(vuln) except Exception as e: print(f"Safety DB scan error for {library_name}: {e}") return vulnerabilities async def _empty_scan(self) -> List[Vulnerability]: """Empty scan for unsupported ecosystems""" return [] def _parse_osv_vulnerability( self, vuln_data: Dict[str, Any] ) -> Optional[Vulnerability]: """Parse OSV vulnerability data""" try: # Extract severity severity = SeverityLevel.MEDIUM # Default cvss_score = None if "severity" in vuln_data: severity_info = vuln_data["severity"] if isinstance(severity_info, list) and severity_info: severity_data = severity_info[0] score = severity_data.get("score") if score: cvss_score = float(score) if cvss_score >= 9.0: severity = SeverityLevel.CRITICAL elif cvss_score >= 7.0: severity = SeverityLevel.HIGH elif cvss_score >= 4.0: severity = SeverityLevel.MEDIUM else: severity = SeverityLevel.LOW # Extract affected versions affected_versions = [] for affected in vuln_data.get("affected", []): ranges = affected.get("ranges", []) for range_info in ranges: events = range_info.get("events", []) for event in events: if "introduced" in event: affected_versions.append(f">={event['introduced']}") elif "fixed" in event: affected_versions.append(f"<{event['fixed']}") # Extract references references = [] for ref in vuln_data.get("references", []): if "url" in ref: references.append(ref["url"]) return Vulnerability( id=vuln_data.get("id", ""), title=vuln_data.get("summary", ""), description=vuln_data.get("details", ""), severity=severity, cvss_score=cvss_score, cve_id=self._extract_cve_id(vuln_data), affected_versions=affected_versions, fixed_version=self._extract_fixed_version(vuln_data), published_date=vuln_data.get("published", ""), source="osv", references=references, ) except Exception as e: print(f"Error parsing OSV vulnerability: {e}") return None def _extract_cve_id(self, vuln_data: Dict[str, Any]) -> Optional[str]: """Extract CVE ID from vulnerability data""" aliases = vuln_data.get("aliases", []) for alias in aliases: if alias.startswith("CVE-"): return alias return None def _extract_fixed_version(self, vuln_data: Dict[str, Any]) -> Optional[str]: """Extract fixed version from vulnerability data""" for affected in vuln_data.get("affected", []): ranges = affected.get("ranges", []) for range_info in ranges: events = range_info.get("events", []) for event in events: if "fixed" in event: return event["fixed"] return None def _generate_security_report( self, library_name: str, ecosystem: str, vulnerabilities: List[Vulnerability] ) -> SecurityReport: """Generate comprehensive security report""" # Count vulnerabilities by severity critical_count = sum( 1 for v in vulnerabilities if v.severity == SeverityLevel.CRITICAL ) high_count = sum(1 for v in vulnerabilities if v.severity == SeverityLevel.HIGH) medium_count = sum( 1 for v in vulnerabilities if v.severity == SeverityLevel.MEDIUM ) low_count = sum(1 for v in vulnerabilities if v.severity == SeverityLevel.LOW) # Calculate security score (0-100, higher is better) security_score = self._calculate_security_score( critical_count, high_count, medium_count, low_count ) # Generate recommendations recommendations = self._generate_recommendations( library_name, vulnerabilities, security_score ) # Find latest secure version (placeholder) latest_secure_version = self._find_latest_secure_version(vulnerabilities) return SecurityReport( library_name=library_name, ecosystem=ecosystem, scan_date=datetime.now().isoformat(), total_vulnerabilities=len(vulnerabilities), critical_count=critical_count, high_count=high_count, medium_count=medium_count, low_count=low_count, security_score=security_score, recommendations=recommendations, vulnerabilities=vulnerabilities[:10], # Limit to top 10 latest_secure_version=latest_secure_version, ) def _calculate_security_score( self, critical: int, high: int, medium: int, low: int ) -> float: """Calculate security score based on vulnerability counts""" # Start with perfect score score = 100.0 # Deduct points based on severity score -= critical * 25 # Critical: -25 points each score -= high * 15 # High: -15 points each score -= medium * 5 # Medium: -5 points each score -= low * 1 # Low: -1 point each # Ensure score doesn't go below 0 return max(0.0, score) def _generate_recommendations( self, library_name: str, vulnerabilities: List[Vulnerability], security_score: float, ) -> List[str]: """Generate security recommendations""" recommendations = [] if security_score < 50: recommendations.append( "🚨 High security risk - Consider alternative libraries" ) elif security_score < 70: recommendations.append("⚠️ Moderate security risk - Monitor for updates") elif security_score < 90: recommendations.append("✅ Generally secure - Keep updated") else: recommendations.append("🛡️ Excellent security record") # Specific recommendations based on vulnerabilities critical_vulns = [ v for v in vulnerabilities if v.severity == SeverityLevel.CRITICAL ] if critical_vulns: recommendations.append( "🔥 Update immediately - Critical vulnerabilities found" ) fixed_versions = [v.fixed_version for v in vulnerabilities if v.fixed_version] if fixed_versions: latest_fix = max(fixed_versions) recommendations.append(f"📦 Update to version {latest_fix} or later") if len(vulnerabilities) > 5: recommendations.append( "📊 Many vulnerabilities found - Consider security audit" ) return recommendations[:5] # Limit recommendations def _find_latest_secure_version( self, vulnerabilities: List[Vulnerability] ) -> Optional[str]: """Find the latest secure version""" fixed_versions = [v.fixed_version for v in vulnerabilities if v.fixed_version] if fixed_versions: # This is simplified - real implementation would use proper version comparison return max(fixed_versions) return None def _is_cached(self, cache_key: str) -> bool: """Check if result is cached and still valid""" if cache_key not in self.cache: return False cached_time = self.cache[cache_key]["timestamp"] return datetime.now() - cached_time < self.cache_ttl def _cache_result(self, cache_key: str, result: SecurityReport) -> None: """Cache scan result""" self.cache[cache_key] = {"data": result, "timestamp": datetime.now()} # Simple cache cleanup - remove old entries if len(self.cache) > 100: oldest_key = min( self.cache.keys(), key=lambda k: self.cache[k]["timestamp"] ) del self.cache[oldest_key] class SecurityIntegration: """Integration layer for security features""" def __init__(self, scanner: VulnerabilityScanner): self.scanner = scanner async def get_security_score( self, library_name: str, ecosystem: str = "PyPI" ) -> float: """Get security score for a library (0-100, higher is better)""" try: report = await self.scanner.scan_library(library_name, ecosystem) return report.security_score except Exception: return 50.0 # Default neutral score async def is_library_secure( self, library_name: str, ecosystem: str = "PyPI", threshold: float = 70.0 ) -> bool: """Check if library meets security threshold""" score = await self.get_security_score(library_name, ecosystem) return score >= threshold async def get_security_summary( self, library_name: str, ecosystem: str = "PyPI" ) -> Dict[str, Any]: """Get concise security summary""" try: report = await self.scanner.scan_library(library_name, ecosystem) return { "library": library_name, "security_score": report.security_score, "total_vulnerabilities": report.total_vulnerabilities, "critical_vulnerabilities": report.critical_count, "status": "secure" if report.security_score >= 70 else "at_risk", "primary_recommendation": ( report.recommendations[0] if report.recommendations else "No specific recommendations" ), } except Exception as e: return { "library": library_name, "security_score": 50.0, "error": str(e), "status": "unknown", } # Global instances vulnerability_scanner = VulnerabilityScanner() security_integration = SecurityIntegration(vulnerability_scanner)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/anton-prosterity/documentation-search-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server