example_usage.pyβ’9.51 kB
#!/usr/bin/env python3
"""
Example usage of MCP Kali Pentest Server
Demonstrates how to use the MCP client to interact with the pentest server
"""
import asyncio
import json
from mcp import ClientSession
from mcp.client.stdio import stdio_client, StdioServerParameters
async def example_autonomous_pentest():
"""Example: Start an autonomous penetration test"""
print("=" * 60)
print("Example 1: Autonomous Penetration Test")
print("=" * 60)
# Create MCP client
server_params = StdioServerParameters(
command="python3",
args=["server.py"],
env=None
)
async with stdio_client(server_params) as (read, write):
async with ClientSession(read, write) as session:
# Initialize
await session.initialize()
# Start autonomous pentest
result = await session.call_tool(
"start_autonomous_pentest",
arguments={
"target": "192.168.1.100",
"scope": ["192.168.1.100"],
"depth": "vulnerability_scan",
"rules_of_engagement": {
"allow_exploitation": False,
"rate_limit": True
}
}
)
print("\nResult:")
print(json.dumps(result, indent=2))
async def example_nmap_scan():
"""Example: Run Nmap scan"""
print("\n" + "=" * 60)
print("Example 2: Nmap Scan")
print("=" * 60)
server_params = StdioServerParameters(
command="python3",
args=["server.py"],
env=None
)
async with stdio_client(server_params) as (read, write):
async with ClientSession(read, write) as session:
await session.initialize()
# Run Nmap scan
result = await session.call_tool(
"nmap_scan",
arguments={
"target": "scanme.nmap.org",
"scan_type": "quick",
"scripts": ["http-title", "http-headers"]
}
)
print("\nScan Results:")
print(json.dumps(result, indent=2))
async def example_web_vulnerability_scan():
"""Example: Web application vulnerability scanning"""
print("\n" + "=" * 60)
print("Example 3: Web Vulnerability Assessment")
print("=" * 60)
server_params = StdioServerParameters(
command="python3",
args=["server.py"],
env=None
)
async with stdio_client(server_params) as (read, write):
async with ClientSession(read, write) as session:
await session.initialize()
# Run comprehensive web assessment
result = await session.call_tool(
"vulnerability_assessment",
arguments={
"target": "http://testphp.vulnweb.com",
"assessment_type": "web"
}
)
print("\nVulnerability Assessment:")
print(json.dumps(result, indent=2))
async def example_ai_suggestion():
"""Example: Get AI suggestions based on findings"""
print("\n" + "=" * 60)
print("Example 4: AI-Powered Suggestions")
print("=" * 60)
server_params = StdioServerParameters(
command="python3",
args=["server.py"],
env=None
)
async with stdio_client(server_params) as (read, write):
async with ClientSession(read, write) as session:
await session.initialize()
# Get AI suggestion
result = await session.call_tool(
"get_ai_suggestion",
arguments={
"session_id": "pentest_20241105_120000",
"context": "Found open ports 22, 80, 443, 3306. What should I scan next?"
}
)
print("\nAI Suggestion:")
print(json.dumps(result, indent=2))
async def example_list_resources():
"""Example: List available resources"""
print("\n" + "=" * 60)
print("Example 5: List Resources (Sessions & Reports)")
print("=" * 60)
server_params = StdioServerParameters(
command="python3",
args=["server.py"],
env=None
)
async with stdio_client(server_params) as (read, write):
async with ClientSession(read, write) as session:
await session.initialize()
# List resources
resources = await session.list_resources()
print("\nAvailable Resources:")
for resource in resources:
print(f" - {resource.name}")
print(f" URI: {resource.uri}")
print(f" Description: {resource.description}")
print()
async def example_generate_report():
"""Example: Generate penetration test report"""
print("\n" + "=" * 60)
print("Example 6: Generate Report")
print("=" * 60)
server_params = StdioServerParameters(
command="python3",
args=["server.py"],
env=None
)
async with stdio_client(server_params) as (read, write):
async with ClientSession(read, write) as session:
await session.initialize()
# Generate report
result = await session.call_tool(
"generate_report",
arguments={
"session_id": "pentest_20241105_120000",
"format": "html"
}
)
print("\nReport Generated:")
print(json.dumps(result, indent=2))
async def example_dns_enumeration():
"""Example: DNS enumeration and subdomain discovery"""
print("\n" + "=" * 60)
print("Example 7: DNS Enumeration")
print("=" * 60)
server_params = StdioServerParameters(
command="python3",
args=["server.py"],
env=None
)
async with stdio_client(server_params) as (read, write):
async with ClientSession(read, write) as session:
await session.initialize()
# DNS enumeration
result = await session.call_tool(
"dns_enum",
arguments={
"domain": "example.com",
"record_types": ["A", "AAAA", "MX", "NS", "TXT"]
}
)
print("\nDNS Enumeration Results:")
print(json.dumps(result, indent=2))
async def example_nuclei_scan():
"""Example: Fast vulnerability scanning with Nuclei"""
print("\n" + "=" * 60)
print("Example 8: Nuclei Vulnerability Scan")
print("=" * 60)
server_params = StdioServerParameters(
command="python3",
args=["server.py"],
env=None
)
async with stdio_client(server_params) as (read, write):
async with ClientSession(read, write) as session:
await session.initialize()
# Nuclei scan
result = await session.call_tool(
"nuclei_scan",
arguments={
"target": "http://testphp.vulnweb.com",
"severity": ["critical", "high"]
}
)
print("\nNuclei Scan Results:")
print(json.dumps(result, indent=2))
def print_menu():
"""Print example menu"""
print("\n" + "=" * 60)
print("MCP Kali Pentest - Example Usage")
print("=" * 60)
print("\nAvailable Examples:")
print("1. Autonomous Penetration Test")
print("2. Nmap Port Scan")
print("3. Web Vulnerability Assessment")
print("4. AI-Powered Suggestions")
print("5. List Resources")
print("6. Generate Report")
print("7. DNS Enumeration")
print("8. Nuclei Vulnerability Scan")
print("9. Run All Examples")
print("0. Exit")
print()
async def main():
"""Main function"""
print("\nWARNING: Only use these examples on systems you own or have")
print("explicit written permission to test!\n")
while True:
print_menu()
choice = input("Select example (0-9): ").strip()
if choice == "0":
print("Goodbye!")
break
elif choice == "1":
await example_autonomous_pentest()
elif choice == "2":
await example_nmap_scan()
elif choice == "3":
await example_web_vulnerability_scan()
elif choice == "4":
await example_ai_suggestion()
elif choice == "5":
await example_list_resources()
elif choice == "6":
await example_generate_report()
elif choice == "7":
await example_dns_enumeration()
elif choice == "8":
await example_nuclei_scan()
elif choice == "9":
print("\nRunning all examples...")
await example_nmap_scan()
await example_dns_enumeration()
await example_web_vulnerability_scan()
await example_nuclei_scan()
await example_ai_suggestion()
await example_list_resources()
print("\nAll examples completed!")
else:
print("Invalid choice. Please try again.")
input("\nPress Enter to continue...")
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\n\nInterrupted by user. Goodbye!")
except Exception as e:
print(f"\nError: {e}")
import traceback
traceback.print_exc()