pentest_engine.pyβ’13.6 kB
"""
Penetration Testing Engine
Handles session management, reporting, and coordination
"""
import json
import logging
from typing import Dict, List, Any, Optional
from datetime import datetime
from pathlib import Path
import hashlib
logger = logging.getLogger(__name__)
class PentestEngine:
"""Core penetration testing engine"""
def __init__(self, data_dir: str = "/var/lib/mcpkali"):
self.data_dir = Path(data_dir)
self.data_dir.mkdir(parents=True, exist_ok=True)
self.reports_dir = self.data_dir / "reports"
self.reports_dir.mkdir(exist_ok=True)
self.sessions_dir = self.data_dir / "sessions"
self.sessions_dir.mkdir(exist_ok=True)
def get_available_reports(self) -> List[Dict[str, Any]]:
"""Get list of available reports"""
reports = []
for report_file in self.reports_dir.glob("*.json"):
try:
with open(report_file, "r") as f:
report_data = json.load(f)
reports.append({
"id": report_file.stem,
"name": report_data.get("name", report_file.stem),
"description": report_data.get("description", ""),
"created_at": report_data.get("created_at", ""),
"target": report_data.get("target", "")
})
except Exception as e:
logger.error(f"Error reading report {report_file}: {e}")
return sorted(reports, key=lambda x: x.get("created_at", ""), reverse=True)
def get_report(self, report_id: str) -> Dict[str, Any]:
"""Get specific report"""
report_file = self.reports_dir / f"{report_id}.json"
if not report_file.exists():
raise ValueError(f"Report {report_id} not found")
with open(report_file, "r") as f:
return json.load(f)
async def generate_report(
self,
session: Dict[str, Any],
format: str = "json"
) -> Dict[str, Any]:
"""Generate comprehensive penetration test report"""
report_id = f"report_{session['id']}"
report = {
"id": report_id,
"name": f"Penetration Test Report - {session['target']}",
"description": f"Comprehensive security assessment of {session['target']}",
"created_at": datetime.now().isoformat(),
"target": session["target"],
"scope": session.get("scope", []),
"duration": self._calculate_duration(session),
"executive_summary": self._generate_executive_summary(session),
"findings": self._categorize_findings(session["findings"]),
"risk_assessment": self._assess_overall_risk(session),
"recommendations": self._generate_recommendations(session),
"timeline": session.get("timeline", []),
"tools_used": self._get_tools_used(session),
"statistics": self._generate_statistics(session)
}
# Save report
if format == "json":
report_file = self.reports_dir / f"{report_id}.json"
with open(report_file, "w") as f:
json.dump(report, f, indent=2)
elif format == "html":
report["html"] = self._generate_html_report(report)
elif format == "markdown":
report["markdown"] = self._generate_markdown_report(report)
elif format == "pdf":
report["note"] = "PDF generation requires additional tools (wkhtmltopdf)"
report["html"] = self._generate_html_report(report)
return report
def _calculate_duration(self, session: Dict[str, Any]) -> str:
"""Calculate test duration"""
started = datetime.fromisoformat(session["started_at"])
ended = datetime.fromisoformat(session.get("completed_at", datetime.now().isoformat()))
duration = ended - started
hours = duration.total_seconds() / 3600
return f"{hours:.2f} hours"
def _generate_executive_summary(self, session: Dict[str, Any]) -> Dict[str, Any]:
"""Generate executive summary"""
findings = session.get("findings", [])
critical_count = sum(1 for f in findings if "critical" in str(f).lower())
high_count = sum(1 for f in findings if "high" in str(f).lower())
medium_count = sum(1 for f in findings if "medium" in str(f).lower())
low_count = sum(1 for f in findings if "low" in str(f).lower())
return {
"target": session["target"],
"test_date": session["started_at"],
"duration": self._calculate_duration(session),
"total_findings": len(findings),
"by_severity": {
"critical": critical_count,
"high": high_count,
"medium": medium_count,
"low": low_count
},
"overall_risk": self._assess_overall_risk(session),
"key_findings": self._get_key_findings(findings)
}
def _categorize_findings(self, findings: List[Dict[str, Any]]) -> Dict[str, List[Dict[str, Any]]]:
"""Categorize findings by type and severity"""
categorized = {
"critical": [],
"high": [],
"medium": [],
"low": [],
"informational": []
}
for finding in findings:
severity = self._determine_severity(finding)
categorized[severity].append({
"tool": finding.get("tool"),
"timestamp": finding.get("timestamp"),
"phase": finding.get("phase"),
"details": finding.get("result")
})
return categorized
def _determine_severity(self, finding: Dict[str, Any]) -> str:
"""Determine finding severity"""
result_str = str(finding.get("result", "")).lower()
if "critical" in result_str:
return "critical"
elif "high" in result_str:
return "high"
elif "medium" in result_str:
return "medium"
elif "low" in result_str:
return "low"
else:
return "informational"
def _assess_overall_risk(self, session: Dict[str, Any]) -> str:
"""Assess overall risk level"""
findings = session.get("findings", [])
critical_count = sum(1 for f in findings if "critical" in str(f).lower())
high_count = sum(1 for f in findings if "high" in str(f).lower())
if critical_count > 0:
return "CRITICAL"
elif high_count >= 3:
return "HIGH"
elif high_count > 0:
return "MEDIUM"
else:
return "LOW"
def _generate_recommendations(self, session: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Generate remediation recommendations"""
recommendations = []
findings = session.get("findings", [])
# Analyze findings and generate recommendations
for finding in findings:
tool = finding.get("tool")
result = finding.get("result", {})
if tool == "sqlmap_scan" and result.get("vulnerable"):
recommendations.append({
"priority": "CRITICAL",
"title": "SQL Injection Vulnerability",
"description": "Application is vulnerable to SQL injection attacks",
"remediation": "Implement parameterized queries and input validation",
"references": ["OWASP Top 10 - A03:2021"]
})
elif tool == "ssl_scan" and result.get("findings", {}).get("weak_ciphers"):
recommendations.append({
"priority": "HIGH",
"title": "Weak SSL/TLS Configuration",
"description": "Server supports weak cipher suites",
"remediation": "Disable weak ciphers and enable only TLS 1.2+",
"references": ["OWASP TLS Cheat Sheet"]
})
return recommendations
def _get_key_findings(self, findings: List[Dict[str, Any]]) -> List[str]:
"""Extract key findings for executive summary"""
key_findings = []
for finding in findings[:5]: # Top 5 findings
tool = finding.get("tool")
result = finding.get("result", {})
if result.get("vulnerable"):
key_findings.append(f"Vulnerability detected by {tool}")
elif result.get("vulnerabilities"):
count = len(result["vulnerabilities"])
key_findings.append(f"{count} vulnerabilities found by {tool}")
return key_findings
def _get_tools_used(self, session: Dict[str, Any]) -> List[str]:
"""Get list of tools used in the test"""
findings = session.get("findings", [])
tools = list(set([f.get("tool") for f in findings if f.get("tool")]))
return sorted(tools)
def _generate_statistics(self, session: Dict[str, Any]) -> Dict[str, Any]:
"""Generate test statistics"""
findings = session.get("findings", [])
return {
"total_scans": len(findings),
"tools_used": len(self._get_tools_used(session)),
"phases_completed": len(session.get("timeline", [])),
"duration": self._calculate_duration(session),
"findings_by_phase": self._count_by_phase(findings)
}
def _count_by_phase(self, findings: List[Dict[str, Any]]) -> Dict[str, int]:
"""Count findings by phase"""
counts = {}
for finding in findings:
phase = finding.get("phase", "unknown")
counts[phase] = counts.get(phase, 0) + 1
return counts
def _generate_html_report(self, report: Dict[str, Any]) -> str:
"""Generate HTML report"""
html = f"""
<!DOCTYPE html>
<html>
<head>
<title>{report['name']}</title>
<style>
body {{ font-family: Arial, sans-serif; margin: 40px; }}
h1 {{ color: #333; }}
h2 {{ color: #666; border-bottom: 2px solid #ddd; padding-bottom: 5px; }}
.critical {{ color: #d32f2f; font-weight: bold; }}
.high {{ color: #f57c00; font-weight: bold; }}
.medium {{ color: #fbc02d; font-weight: bold; }}
.low {{ color: #388e3c; font-weight: bold; }}
table {{ border-collapse: collapse; width: 100%; margin: 20px 0; }}
th, td {{ border: 1px solid #ddd; padding: 12px; text-align: left; }}
th {{ background-color: #f5f5f5; }}
</style>
</head>
<body>
<h1>{report['name']}</h1>
<p><strong>Target:</strong> {report['target']}</p>
<p><strong>Date:</strong> {report['created_at']}</p>
<p><strong>Duration:</strong> {report['duration']}</p>
<h2>Executive Summary</h2>
<p><strong>Overall Risk:</strong> <span class="{report['risk_assessment'].lower()}">{report['risk_assessment']}</span></p>
<p><strong>Total Findings:</strong> {report['executive_summary']['total_findings']}</p>
<h3>Findings by Severity</h3>
<table>
<tr>
<th>Severity</th>
<th>Count</th>
</tr>
<tr>
<td class="critical">Critical</td>
<td>{report['executive_summary']['by_severity']['critical']}</td>
</tr>
<tr>
<td class="high">High</td>
<td>{report['executive_summary']['by_severity']['high']}</td>
</tr>
<tr>
<td class="medium">Medium</td>
<td>{report['executive_summary']['by_severity']['medium']}</td>
</tr>
<tr>
<td class="low">Low</td>
<td>{report['executive_summary']['by_severity']['low']}</td>
</tr>
</table>
<h2>Recommendations</h2>
<ul>
"""
for rec in report['recommendations']:
html += f"""
<li>
<strong class="{rec['priority'].lower()}">{rec['title']}</strong><br>
{rec['description']}<br>
<em>Remediation: {rec['remediation']}</em>
</li>
"""
html += """
</ul>
<h2>Tools Used</h2>
<ul>
"""
for tool in report['tools_used']:
html += f" <li>{tool}</li>\n"
html += """
</ul>
</body>
</html>
"""
return html
def _generate_markdown_report(self, report: Dict[str, Any]) -> str:
"""Generate Markdown report"""
md = f"""# {report['name']}
**Target:** {report['target']}
**Date:** {report['created_at']}
**Duration:** {report['duration']}
## Executive Summary
**Overall Risk:** {report['risk_assessment']}
**Total Findings:** {report['executive_summary']['total_findings']}
### Findings by Severity
| Severity | Count |
|----------|-------|
| Critical | {report['executive_summary']['by_severity']['critical']} |
| High | {report['executive_summary']['by_severity']['high']} |
| Medium | {report['executive_summary']['by_severity']['medium']} |
| Low | {report['executive_summary']['by_severity']['low']} |
## Key Findings
"""
for finding in report['executive_summary'].get('key_findings', []):
md += f"- {finding}\n"
md += "\n## Recommendations\n\n"
for rec in report['recommendations']:
md += f"""### {rec['title']} [{rec['priority']}]
{rec['description']}
**Remediation:** {rec['remediation']}
**References:** {', '.join(rec.get('references', []))}
---
"""
md += "\n## Tools Used\n\n"
for tool in report['tools_used']:
md += f"- {tool}\n"
return md