Skip to main content
Glama

MCP Kali Pentest

by Root1856
pentest_engine.pyβ€’13.6 kB
""" Penetration Testing Engine Handles session management, reporting, and coordination """ import json import logging from typing import Dict, List, Any, Optional from datetime import datetime from pathlib import Path import hashlib logger = logging.getLogger(__name__) class PentestEngine: """Core penetration testing engine""" def __init__(self, data_dir: str = "/var/lib/mcpkali"): self.data_dir = Path(data_dir) self.data_dir.mkdir(parents=True, exist_ok=True) self.reports_dir = self.data_dir / "reports" self.reports_dir.mkdir(exist_ok=True) self.sessions_dir = self.data_dir / "sessions" self.sessions_dir.mkdir(exist_ok=True) def get_available_reports(self) -> List[Dict[str, Any]]: """Get list of available reports""" reports = [] for report_file in self.reports_dir.glob("*.json"): try: with open(report_file, "r") as f: report_data = json.load(f) reports.append({ "id": report_file.stem, "name": report_data.get("name", report_file.stem), "description": report_data.get("description", ""), "created_at": report_data.get("created_at", ""), "target": report_data.get("target", "") }) except Exception as e: logger.error(f"Error reading report {report_file}: {e}") return sorted(reports, key=lambda x: x.get("created_at", ""), reverse=True) def get_report(self, report_id: str) -> Dict[str, Any]: """Get specific report""" report_file = self.reports_dir / f"{report_id}.json" if not report_file.exists(): raise ValueError(f"Report {report_id} not found") with open(report_file, "r") as f: return json.load(f) async def generate_report( self, session: Dict[str, Any], format: str = "json" ) -> Dict[str, Any]: """Generate comprehensive penetration test report""" report_id = f"report_{session['id']}" report = { "id": report_id, "name": f"Penetration Test Report - {session['target']}", "description": f"Comprehensive security assessment of {session['target']}", "created_at": datetime.now().isoformat(), "target": session["target"], "scope": session.get("scope", []), "duration": self._calculate_duration(session), "executive_summary": self._generate_executive_summary(session), "findings": self._categorize_findings(session["findings"]), "risk_assessment": self._assess_overall_risk(session), "recommendations": self._generate_recommendations(session), "timeline": session.get("timeline", []), "tools_used": self._get_tools_used(session), "statistics": self._generate_statistics(session) } # Save report if format == "json": report_file = self.reports_dir / f"{report_id}.json" with open(report_file, "w") as f: json.dump(report, f, indent=2) elif format == "html": report["html"] = self._generate_html_report(report) elif format == "markdown": report["markdown"] = self._generate_markdown_report(report) elif format == "pdf": report["note"] = "PDF generation requires additional tools (wkhtmltopdf)" report["html"] = self._generate_html_report(report) return report def _calculate_duration(self, session: Dict[str, Any]) -> str: """Calculate test duration""" started = datetime.fromisoformat(session["started_at"]) ended = datetime.fromisoformat(session.get("completed_at", datetime.now().isoformat())) duration = ended - started hours = duration.total_seconds() / 3600 return f"{hours:.2f} hours" def _generate_executive_summary(self, session: Dict[str, Any]) -> Dict[str, Any]: """Generate executive summary""" findings = session.get("findings", []) critical_count = sum(1 for f in findings if "critical" in str(f).lower()) high_count = sum(1 for f in findings if "high" in str(f).lower()) medium_count = sum(1 for f in findings if "medium" in str(f).lower()) low_count = sum(1 for f in findings if "low" in str(f).lower()) return { "target": session["target"], "test_date": session["started_at"], "duration": self._calculate_duration(session), "total_findings": len(findings), "by_severity": { "critical": critical_count, "high": high_count, "medium": medium_count, "low": low_count }, "overall_risk": self._assess_overall_risk(session), "key_findings": self._get_key_findings(findings) } def _categorize_findings(self, findings: List[Dict[str, Any]]) -> Dict[str, List[Dict[str, Any]]]: """Categorize findings by type and severity""" categorized = { "critical": [], "high": [], "medium": [], "low": [], "informational": [] } for finding in findings: severity = self._determine_severity(finding) categorized[severity].append({ "tool": finding.get("tool"), "timestamp": finding.get("timestamp"), "phase": finding.get("phase"), "details": finding.get("result") }) return categorized def _determine_severity(self, finding: Dict[str, Any]) -> str: """Determine finding severity""" result_str = str(finding.get("result", "")).lower() if "critical" in result_str: return "critical" elif "high" in result_str: return "high" elif "medium" in result_str: return "medium" elif "low" in result_str: return "low" else: return "informational" def _assess_overall_risk(self, session: Dict[str, Any]) -> str: """Assess overall risk level""" findings = session.get("findings", []) critical_count = sum(1 for f in findings if "critical" in str(f).lower()) high_count = sum(1 for f in findings if "high" in str(f).lower()) if critical_count > 0: return "CRITICAL" elif high_count >= 3: return "HIGH" elif high_count > 0: return "MEDIUM" else: return "LOW" def _generate_recommendations(self, session: Dict[str, Any]) -> List[Dict[str, Any]]: """Generate remediation recommendations""" recommendations = [] findings = session.get("findings", []) # Analyze findings and generate recommendations for finding in findings: tool = finding.get("tool") result = finding.get("result", {}) if tool == "sqlmap_scan" and result.get("vulnerable"): recommendations.append({ "priority": "CRITICAL", "title": "SQL Injection Vulnerability", "description": "Application is vulnerable to SQL injection attacks", "remediation": "Implement parameterized queries and input validation", "references": ["OWASP Top 10 - A03:2021"] }) elif tool == "ssl_scan" and result.get("findings", {}).get("weak_ciphers"): recommendations.append({ "priority": "HIGH", "title": "Weak SSL/TLS Configuration", "description": "Server supports weak cipher suites", "remediation": "Disable weak ciphers and enable only TLS 1.2+", "references": ["OWASP TLS Cheat Sheet"] }) return recommendations def _get_key_findings(self, findings: List[Dict[str, Any]]) -> List[str]: """Extract key findings for executive summary""" key_findings = [] for finding in findings[:5]: # Top 5 findings tool = finding.get("tool") result = finding.get("result", {}) if result.get("vulnerable"): key_findings.append(f"Vulnerability detected by {tool}") elif result.get("vulnerabilities"): count = len(result["vulnerabilities"]) key_findings.append(f"{count} vulnerabilities found by {tool}") return key_findings def _get_tools_used(self, session: Dict[str, Any]) -> List[str]: """Get list of tools used in the test""" findings = session.get("findings", []) tools = list(set([f.get("tool") for f in findings if f.get("tool")])) return sorted(tools) def _generate_statistics(self, session: Dict[str, Any]) -> Dict[str, Any]: """Generate test statistics""" findings = session.get("findings", []) return { "total_scans": len(findings), "tools_used": len(self._get_tools_used(session)), "phases_completed": len(session.get("timeline", [])), "duration": self._calculate_duration(session), "findings_by_phase": self._count_by_phase(findings) } def _count_by_phase(self, findings: List[Dict[str, Any]]) -> Dict[str, int]: """Count findings by phase""" counts = {} for finding in findings: phase = finding.get("phase", "unknown") counts[phase] = counts.get(phase, 0) + 1 return counts def _generate_html_report(self, report: Dict[str, Any]) -> str: """Generate HTML report""" html = f""" <!DOCTYPE html> <html> <head> <title>{report['name']}</title> <style> body {{ font-family: Arial, sans-serif; margin: 40px; }} h1 {{ color: #333; }} h2 {{ color: #666; border-bottom: 2px solid #ddd; padding-bottom: 5px; }} .critical {{ color: #d32f2f; font-weight: bold; }} .high {{ color: #f57c00; font-weight: bold; }} .medium {{ color: #fbc02d; font-weight: bold; }} .low {{ color: #388e3c; font-weight: bold; }} table {{ border-collapse: collapse; width: 100%; margin: 20px 0; }} th, td {{ border: 1px solid #ddd; padding: 12px; text-align: left; }} th {{ background-color: #f5f5f5; }} </style> </head> <body> <h1>{report['name']}</h1> <p><strong>Target:</strong> {report['target']}</p> <p><strong>Date:</strong> {report['created_at']}</p> <p><strong>Duration:</strong> {report['duration']}</p> <h2>Executive Summary</h2> <p><strong>Overall Risk:</strong> <span class="{report['risk_assessment'].lower()}">{report['risk_assessment']}</span></p> <p><strong>Total Findings:</strong> {report['executive_summary']['total_findings']}</p> <h3>Findings by Severity</h3> <table> <tr> <th>Severity</th> <th>Count</th> </tr> <tr> <td class="critical">Critical</td> <td>{report['executive_summary']['by_severity']['critical']}</td> </tr> <tr> <td class="high">High</td> <td>{report['executive_summary']['by_severity']['high']}</td> </tr> <tr> <td class="medium">Medium</td> <td>{report['executive_summary']['by_severity']['medium']}</td> </tr> <tr> <td class="low">Low</td> <td>{report['executive_summary']['by_severity']['low']}</td> </tr> </table> <h2>Recommendations</h2> <ul> """ for rec in report['recommendations']: html += f""" <li> <strong class="{rec['priority'].lower()}">{rec['title']}</strong><br> {rec['description']}<br> <em>Remediation: {rec['remediation']}</em> </li> """ html += """ </ul> <h2>Tools Used</h2> <ul> """ for tool in report['tools_used']: html += f" <li>{tool}</li>\n" html += """ </ul> </body> </html> """ return html def _generate_markdown_report(self, report: Dict[str, Any]) -> str: """Generate Markdown report""" md = f"""# {report['name']} **Target:** {report['target']} **Date:** {report['created_at']} **Duration:** {report['duration']} ## Executive Summary **Overall Risk:** {report['risk_assessment']} **Total Findings:** {report['executive_summary']['total_findings']} ### Findings by Severity | Severity | Count | |----------|-------| | Critical | {report['executive_summary']['by_severity']['critical']} | | High | {report['executive_summary']['by_severity']['high']} | | Medium | {report['executive_summary']['by_severity']['medium']} | | Low | {report['executive_summary']['by_severity']['low']} | ## Key Findings """ for finding in report['executive_summary'].get('key_findings', []): md += f"- {finding}\n" md += "\n## Recommendations\n\n" for rec in report['recommendations']: md += f"""### {rec['title']} [{rec['priority']}] {rec['description']} **Remediation:** {rec['remediation']} **References:** {', '.join(rec.get('references', []))} --- """ md += "\n## Tools Used\n\n" for tool in report['tools_used']: md += f"- {tool}\n" return md

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Root1856/mcpkali'

If you have feedback or need assistance with the MCP directory API, please join our Discord server