"""
Farnsworth Vulnerability Scanner
"Good news, everyone! I've created a scanner that finds weaknesses before the bad guys do!"
Comprehensive vulnerability scanning for authorized security testing.
IMPORTANT: Only use with proper authorization on systems you own or have permission to test.
"""
import asyncio
import ssl
import socket
import re
from typing import Dict, Any, List, Optional, Set
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from pathlib import Path
from loguru import logger
class VulnerabilitySeverity(Enum):
"""Vulnerability severity levels."""
CRITICAL = "critical"
HIGH = "high"
MEDIUM = "medium"
LOW = "low"
INFO = "info"
class VulnerabilityCategory(Enum):
"""Categories of vulnerabilities."""
CONFIGURATION = "configuration"
SSL_TLS = "ssl_tls"
AUTHENTICATION = "authentication"
INJECTION = "injection"
XSS = "xss"
EXPOSURE = "exposure"
OUTDATED = "outdated"
NETWORK = "network"
DNS = "dns"
HEADERS = "headers"
@dataclass
class Vulnerability:
"""A detected vulnerability."""
id: str
title: str
description: str
severity: VulnerabilitySeverity
category: VulnerabilityCategory
affected_component: str
evidence: str = ""
remediation: str = ""
references: List[str] = field(default_factory=list)
cvss_score: Optional[float] = None
cve_id: Optional[str] = None
@dataclass
class VulnerabilityReport:
"""Complete vulnerability scan report."""
target: str
scan_start: datetime
scan_end: Optional[datetime] = None
vulnerabilities: List[Vulnerability] = field(default_factory=list)
scan_type: str = "full"
scanner_version: str = "1.0.0"
@property
def critical_count(self) -> int:
return len([v for v in self.vulnerabilities if v.severity == VulnerabilitySeverity.CRITICAL])
@property
def high_count(self) -> int:
return len([v for v in self.vulnerabilities if v.severity == VulnerabilitySeverity.HIGH])
@property
def medium_count(self) -> int:
return len([v for v in self.vulnerabilities if v.severity == VulnerabilitySeverity.MEDIUM])
def to_dict(self) -> Dict[str, Any]:
return {
"target": self.target,
"scan_start": self.scan_start.isoformat(),
"scan_end": self.scan_end.isoformat() if self.scan_end else None,
"summary": {
"critical": self.critical_count,
"high": self.high_count,
"medium": self.medium_count,
"total": len(self.vulnerabilities),
},
"vulnerabilities": [
{
"id": v.id,
"title": v.title,
"severity": v.severity.value,
"category": v.category.value,
"description": v.description,
"affected_component": v.affected_component,
"evidence": v.evidence,
"remediation": v.remediation,
"references": v.references,
"cvss_score": v.cvss_score,
"cve_id": v.cve_id,
}
for v in self.vulnerabilities
],
}
class VulnerabilityScanner:
"""
Comprehensive vulnerability scanner.
For authorized security testing only.
Features:
- SSL/TLS configuration analysis
- Security header checking
- Port and service detection
- Configuration auditing
- DNS security analysis
"""
# Security headers to check
SECURITY_HEADERS = {
"Strict-Transport-Security": {
"required": True,
"severity": VulnerabilitySeverity.MEDIUM,
"description": "HSTS header missing - allows downgrade attacks",
},
"Content-Security-Policy": {
"required": True,
"severity": VulnerabilitySeverity.MEDIUM,
"description": "CSP header missing - increases XSS risk",
},
"X-Content-Type-Options": {
"required": True,
"severity": VulnerabilitySeverity.LOW,
"description": "X-Content-Type-Options missing - MIME sniffing possible",
},
"X-Frame-Options": {
"required": True,
"severity": VulnerabilitySeverity.MEDIUM,
"description": "X-Frame-Options missing - clickjacking possible",
},
"X-XSS-Protection": {
"required": False,
"severity": VulnerabilitySeverity.INFO,
"description": "X-XSS-Protection header (deprecated but useful for older browsers)",
},
"Referrer-Policy": {
"required": True,
"severity": VulnerabilitySeverity.LOW,
"description": "Referrer-Policy missing - may leak sensitive URLs",
},
"Permissions-Policy": {
"required": False,
"severity": VulnerabilitySeverity.LOW,
"description": "Permissions-Policy missing - browser features not restricted",
},
}
# Weak SSL/TLS configurations
WEAK_SSL_CONFIGS = {
"SSLv2": VulnerabilitySeverity.CRITICAL,
"SSLv3": VulnerabilitySeverity.CRITICAL,
"TLSv1.0": VulnerabilitySeverity.HIGH,
"TLSv1.1": VulnerabilitySeverity.MEDIUM,
}
# Dangerous ports
DANGEROUS_PORTS = {
21: ("FTP", "Unencrypted file transfer"),
23: ("Telnet", "Unencrypted remote access"),
25: ("SMTP", "May allow open relay"),
110: ("POP3", "Unencrypted email"),
143: ("IMAP", "Unencrypted email"),
445: ("SMB", "Windows file sharing - often targeted"),
3389: ("RDP", "Remote desktop - common attack vector"),
5900: ("VNC", "Remote desktop - often unencrypted"),
}
def __init__(self):
"""Initialize vulnerability scanner."""
self._vuln_counter = 0
def _generate_vuln_id(self) -> str:
"""Generate unique vulnerability ID."""
self._vuln_counter += 1
return f"FARN-{self._vuln_counter:04d}"
async def scan(
self,
target: str,
scan_type: str = "full",
ports: Optional[List[int]] = None,
) -> VulnerabilityReport:
"""
Perform vulnerability scan on target.
Args:
target: Hostname or IP address
scan_type: "full", "quick", "ssl", "headers", "ports"
ports: Specific ports to scan (optional)
Returns:
VulnerabilityReport with findings
"""
self._vuln_counter = 0
report = VulnerabilityReport(
target=target,
scan_start=datetime.now(),
scan_type=scan_type,
)
logger.info(f"Starting {scan_type} vulnerability scan on {target}")
try:
# Resolve target
try:
ip = socket.gethostbyname(target)
logger.debug(f"Resolved {target} to {ip}")
except socket.gaierror:
ip = target
# Run appropriate scans based on type
if scan_type in ["full", "ssl"]:
ssl_vulns = await self._scan_ssl(target)
report.vulnerabilities.extend(ssl_vulns)
if scan_type in ["full", "headers"]:
header_vulns = await self._scan_headers(target)
report.vulnerabilities.extend(header_vulns)
if scan_type in ["full", "ports", "quick"]:
port_list = ports or list(self.DANGEROUS_PORTS.keys())
port_vulns = await self._scan_ports(ip, port_list)
report.vulnerabilities.extend(port_vulns)
if scan_type in ["full"]:
dns_vulns = await self._scan_dns(target)
report.vulnerabilities.extend(dns_vulns)
except Exception as e:
logger.error(f"Scan error: {e}")
report.scan_end = datetime.now()
logger.info(
f"Scan complete: {len(report.vulnerabilities)} findings "
f"({report.critical_count} critical, {report.high_count} high)"
)
return report
async def _scan_ssl(self, target: str) -> List[Vulnerability]:
"""Scan SSL/TLS configuration."""
vulnerabilities = []
try:
# Test SSL connection
context = ssl.create_default_context()
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
reader, writer = await asyncio.wait_for(
asyncio.open_connection(
target, 443, ssl=context
),
timeout=10.0,
)
# Get SSL info
ssl_object = writer.get_extra_info("ssl_object")
if ssl_object:
protocol = ssl_object.version()
cipher = ssl_object.cipher()
# Check protocol version
if protocol in self.WEAK_SSL_CONFIGS:
vulnerabilities.append(Vulnerability(
id=self._generate_vuln_id(),
title=f"Weak SSL/TLS Protocol: {protocol}",
description=f"The server supports {protocol} which has known vulnerabilities",
severity=self.WEAK_SSL_CONFIGS[protocol],
category=VulnerabilityCategory.SSL_TLS,
affected_component=f"{target}:443",
evidence=f"Protocol: {protocol}",
remediation=f"Disable {protocol} and use TLS 1.2 or higher",
references=[
"https://www.ssl.com/guide/disable-tls-1-0-and-1-1/",
],
))
# Check cipher strength
if cipher:
cipher_name, _, key_bits = cipher
if key_bits and key_bits < 128:
vulnerabilities.append(Vulnerability(
id=self._generate_vuln_id(),
title="Weak Cipher Suite",
description=f"Server uses weak cipher with only {key_bits} bits",
severity=VulnerabilitySeverity.HIGH,
category=VulnerabilityCategory.SSL_TLS,
affected_component=f"{target}:443",
evidence=f"Cipher: {cipher_name}, Key bits: {key_bits}",
remediation="Configure server to use strong cipher suites",
))
# Check certificate
cert = ssl_object.getpeercert()
if not cert:
vulnerabilities.append(Vulnerability(
id=self._generate_vuln_id(),
title="Invalid or Missing SSL Certificate",
description="Could not retrieve valid SSL certificate",
severity=VulnerabilitySeverity.HIGH,
category=VulnerabilityCategory.SSL_TLS,
affected_component=f"{target}:443",
remediation="Install a valid SSL certificate from a trusted CA",
))
writer.close()
await writer.wait_closed()
except ssl.SSLError as e:
if "certificate verify failed" in str(e):
vulnerabilities.append(Vulnerability(
id=self._generate_vuln_id(),
title="SSL Certificate Verification Failed",
description="The SSL certificate could not be verified",
severity=VulnerabilitySeverity.MEDIUM,
category=VulnerabilityCategory.SSL_TLS,
affected_component=f"{target}:443",
evidence=str(e),
remediation="Use a certificate from a trusted Certificate Authority",
))
except asyncio.TimeoutError:
pass # No HTTPS available
except Exception as e:
logger.debug(f"SSL scan error: {e}")
return vulnerabilities
async def _scan_headers(self, target: str) -> List[Vulnerability]:
"""Scan HTTP security headers."""
vulnerabilities = []
try:
# Try HTTPS first, fall back to HTTP
for protocol in ["https", "http"]:
try:
import httpx
async with httpx.AsyncClient(verify=False) as client:
response = await client.get(
f"{protocol}://{target}",
timeout=10.0,
follow_redirects=True,
)
headers = dict(response.headers)
# Check each security header
for header_name, config in self.SECURITY_HEADERS.items():
if header_name not in headers:
if config["required"]:
vulnerabilities.append(Vulnerability(
id=self._generate_vuln_id(),
title=f"Missing Security Header: {header_name}",
description=config["description"],
severity=config["severity"],
category=VulnerabilityCategory.HEADERS,
affected_component=f"{target} ({protocol})",
remediation=f"Add {header_name} header to HTTP responses",
references=[
"https://owasp.org/www-project-secure-headers/",
],
))
# Check for information disclosure
info_headers = ["Server", "X-Powered-By", "X-AspNet-Version"]
for header in info_headers:
if header in headers:
vulnerabilities.append(Vulnerability(
id=self._generate_vuln_id(),
title=f"Information Disclosure: {header}",
description=f"Server exposes {header} header with version information",
severity=VulnerabilitySeverity.LOW,
category=VulnerabilityCategory.EXPOSURE,
affected_component=f"{target}",
evidence=f"{header}: {headers[header]}",
remediation=f"Remove or obfuscate {header} header",
))
break # Success, no need to try HTTP
except Exception:
continue
except ImportError:
logger.warning("httpx not installed, skipping header scan")
except Exception as e:
logger.debug(f"Header scan error: {e}")
return vulnerabilities
async def _scan_ports(self, target: str, ports: List[int]) -> List[Vulnerability]:
"""Scan for open dangerous ports."""
vulnerabilities = []
async def check_port(port: int) -> Optional[Vulnerability]:
try:
reader, writer = await asyncio.wait_for(
asyncio.open_connection(target, port),
timeout=3.0,
)
writer.close()
await writer.wait_closed()
# Port is open
if port in self.DANGEROUS_PORTS:
service_name, description = self.DANGEROUS_PORTS[port]
return Vulnerability(
id=self._generate_vuln_id(),
title=f"Potentially Dangerous Port Open: {port}/{service_name}",
description=f"{service_name} port is open. {description}",
severity=VulnerabilitySeverity.MEDIUM,
category=VulnerabilityCategory.NETWORK,
affected_component=f"{target}:{port}",
remediation=f"Close port {port} if not needed, or ensure proper security measures",
)
return None
except (asyncio.TimeoutError, ConnectionRefusedError, OSError):
return None
# Scan ports concurrently
tasks = [check_port(port) for port in ports]
results = await asyncio.gather(*tasks, return_exceptions=True)
for result in results:
if isinstance(result, Vulnerability):
vulnerabilities.append(result)
return vulnerabilities
async def _scan_dns(self, target: str) -> List[Vulnerability]:
"""Scan DNS configuration for security issues."""
vulnerabilities = []
try:
import dns.resolver
import dns.exception
except ImportError:
logger.debug("dnspython not installed, skipping DNS scan")
return vulnerabilities
# Extract domain from target
domain = target
if domain.startswith("www."):
domain = domain[4:]
try:
# Check for SPF record
try:
txt_records = dns.resolver.resolve(domain, "TXT")
has_spf = any("v=spf1" in str(r) for r in txt_records)
if not has_spf:
vulnerabilities.append(Vulnerability(
id=self._generate_vuln_id(),
title="Missing SPF Record",
description="No SPF record found - domain may be vulnerable to email spoofing",
severity=VulnerabilitySeverity.MEDIUM,
category=VulnerabilityCategory.DNS,
affected_component=domain,
remediation="Add an SPF TXT record to prevent email spoofing",
references=["https://support.google.com/a/answer/33786"],
))
except dns.exception.DNSException:
pass
# Check for DMARC record
try:
dmarc_records = dns.resolver.resolve(f"_dmarc.{domain}", "TXT")
has_dmarc = any("v=DMARC1" in str(r) for r in dmarc_records)
if not has_dmarc:
raise dns.exception.DNSException()
except dns.exception.DNSException:
vulnerabilities.append(Vulnerability(
id=self._generate_vuln_id(),
title="Missing DMARC Record",
description="No DMARC record found - email authentication policy not enforced",
severity=VulnerabilitySeverity.MEDIUM,
category=VulnerabilityCategory.DNS,
affected_component=domain,
remediation="Add a DMARC TXT record at _dmarc.{domain}",
references=["https://support.google.com/a/answer/2466563"],
))
# Check for DNSSEC
try:
dns.resolver.resolve(domain, "DNSKEY")
except dns.exception.DNSException:
vulnerabilities.append(Vulnerability(
id=self._generate_vuln_id(),
title="DNSSEC Not Enabled",
description="Domain does not have DNSSEC configured",
severity=VulnerabilitySeverity.LOW,
category=VulnerabilityCategory.DNS,
affected_component=domain,
remediation="Enable DNSSEC to protect against DNS spoofing",
references=["https://www.cloudflare.com/dns/dnssec/"],
))
except Exception as e:
logger.debug(f"DNS scan error: {e}")
return vulnerabilities
def generate_html_report(self, report: VulnerabilityReport) -> str:
"""Generate HTML report from vulnerability scan."""
severity_colors = {
VulnerabilitySeverity.CRITICAL: "#dc3545",
VulnerabilitySeverity.HIGH: "#fd7e14",
VulnerabilitySeverity.MEDIUM: "#ffc107",
VulnerabilitySeverity.LOW: "#17a2b8",
VulnerabilitySeverity.INFO: "#6c757d",
}
html = f"""<!DOCTYPE html>
<html>
<head>
<title>Vulnerability Scan Report - {report.target}</title>
<style>
body {{ font-family: Arial, sans-serif; margin: 20px; background: #1a1a2e; color: #eee; }}
.header {{ background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); padding: 20px; border-radius: 10px; margin-bottom: 20px; }}
.summary {{ display: flex; gap: 20px; margin-bottom: 20px; }}
.summary-card {{ background: rgba(255,255,255,0.1); padding: 20px; border-radius: 10px; text-align: center; flex: 1; }}
.vuln {{ background: rgba(255,255,255,0.05); padding: 15px; border-radius: 8px; margin-bottom: 15px; border-left: 4px solid; }}
.critical {{ border-color: {severity_colors[VulnerabilitySeverity.CRITICAL]}; }}
.high {{ border-color: {severity_colors[VulnerabilitySeverity.HIGH]}; }}
.medium {{ border-color: {severity_colors[VulnerabilitySeverity.MEDIUM]}; }}
.low {{ border-color: {severity_colors[VulnerabilitySeverity.LOW]}; }}
.info {{ border-color: {severity_colors[VulnerabilitySeverity.INFO]}; }}
.severity-badge {{ display: inline-block; padding: 2px 8px; border-radius: 4px; font-size: 12px; font-weight: bold; color: white; }}
</style>
</head>
<body>
<div class="header">
<h1>Vulnerability Scan Report</h1>
<p>Target: {report.target}</p>
<p>Scan Time: {report.scan_start.strftime('%Y-%m-%d %H:%M:%S')}</p>
</div>
<div class="summary">
<div class="summary-card">
<h2 style="color: {severity_colors[VulnerabilitySeverity.CRITICAL]}">{report.critical_count}</h2>
<p>Critical</p>
</div>
<div class="summary-card">
<h2 style="color: {severity_colors[VulnerabilitySeverity.HIGH]}">{report.high_count}</h2>
<p>High</p>
</div>
<div class="summary-card">
<h2 style="color: {severity_colors[VulnerabilitySeverity.MEDIUM]}">{report.medium_count}</h2>
<p>Medium</p>
</div>
<div class="summary-card">
<h2>{len(report.vulnerabilities)}</h2>
<p>Total</p>
</div>
</div>
<h2>Findings</h2>
"""
for vuln in sorted(report.vulnerabilities, key=lambda v: list(VulnerabilitySeverity).index(v.severity)):
html += f"""
<div class="vuln {vuln.severity.value}">
<h3>{vuln.title}
<span class="severity-badge" style="background: {severity_colors[vuln.severity]}">{vuln.severity.value.upper()}</span>
</h3>
<p><strong>Category:</strong> {vuln.category.value}</p>
<p><strong>Affected:</strong> {vuln.affected_component}</p>
<p>{vuln.description}</p>
{"<p><strong>Evidence:</strong> " + vuln.evidence + "</p>" if vuln.evidence else ""}
<p><strong>Remediation:</strong> {vuln.remediation}</p>
</div>
"""
html += """
<footer style="margin-top: 40px; text-align: center; color: #666;">
Generated by Farnsworth Security Scanner
</footer>
</body>
</html>"""
return html
# Global instance
vulnerability_scanner = VulnerabilityScanner()