# Active Reconnaissance Tools for Bug Bounty MCP
**Date:** October 12, 2025
**Purpose:** Guide to active reconnaissance tools - current implementation and expansion opportunities
**Audience:** MCP server developers and security researchers
---
## 📋 Table of Contents
1. [Currently Implemented Active Recon Tools](#currently-implemented)
2. [Active Recon Tools That Can Be Added](#tools-to-add)
3. [Implementation Guide](#implementation-guide)
4. [Security Considerations](#security-considerations)
5. [Code Examples](#code-examples)
---
## 🔍 Currently Implemented Active Recon Tools
### Available in MCP Server:
#### 1. **Port Scanning** ✅
```python
Tool: nmap
Method: async def port_scan()
Location: src/bugbounty_mcp/tools/recon.py
Capabilities:
- Quick scan (top 1000 ports)
- Full scan (all 65535 ports)
- Custom port ranges
- Service detection
- OS fingerprinting
Usage:
mcp_bugbounty_port_scan(
program_id="example",
target="api.example.com",
scan_type="quick"
)
```
#### 2. **Subdomain Enumeration** ✅ (Passive + Active)
```python
Tool: subfinder
Method: async def subdomain_enum()
Location: src/bugbounty_mcp/tools/recon.py
Capabilities:
- Passive enumeration (certificate transparency, DNS databases)
- Active enumeration (DNS brute-forcing)
- Automatic scope filtering
- Caching for performance
Usage:
mcp_bugbounty_subdomain_enum(
program_id="example",
domain="example.com",
method="all" # passive, active, or all
)
```
#### 3. **DNS Enumeration** ✅
```python
Tool: dnsx (planned)
Method: async def dns_enumeration()
Location: src/bugbounty_mcp/tools/recon.py
Capabilities:
- DNS record enumeration
- Zone transfer attempts
- DNS security checks
- DNSSEC validation
Usage:
mcp_bugbounty_dns_enumeration(
program_id="example",
domain="example.com"
)
```
#### 4. **Technology Detection** ✅
```python
Tool: wappalyzer-cli
Method: async def technology_detection()
Location: src/bugbounty_mcp/tools/recon.py
Capabilities:
- Web technology fingerprinting
- Framework detection
- CMS identification
- Server technology discovery
Usage:
mcp_bugbounty_technology_detection(
url="https://example.com"
)
```
---
## 🚀 Active Recon Tools That Can Be Added
### High Priority (Should Add):
#### 1. **Advanced Subdomain Discovery**
```python
Tool: amass
Priority: HIGH
Reason: More comprehensive than subfinder
Capabilities:
- Active DNS enumeration
- DNS brute-forcing with wordlists
- Certificate transparency logs
- DNS zone walking
- Reverse DNS lookups
- ASN enumeration
Implementation Complexity: MEDIUM
Expected Function:
async def advanced_subdomain_enum(
program_id: str,
domain: str,
mode: str = "passive", # passive, active, hybrid
wordlist: Optional[str] = None,
resolvers: Optional[List[str]] = None,
) -> Dict[str, Any]:
"""
Advanced subdomain enumeration using amass.
Returns:
{
'success': True,
'scan_id': 'uuid',
'domain': 'example.com',
'subdomains': ['api.example.com', 'app.example.com'],
'count': 150,
'method': 'hybrid',
'sources': ['cert_transparency', 'dns_bruteforce', 'web_scraping']
}
"""
```
#### 2. **Web Crawling & Spidering**
```python
Tool: gospider, hakrawler, katana
Priority: HIGH
Reason: Discover hidden endpoints and parameters
Capabilities:
- JavaScript parsing
- Link extraction
- Form discovery
- API endpoint detection
- Parameter extraction
- Sitemap parsing
Implementation Complexity: MEDIUM
Expected Function:
async def web_crawl(
program_id: str,
url: str,
depth: int = 3,
js_parsing: bool = True,
max_pages: int = 500,
) -> Dict[str, Any]:
"""
Crawl website to discover URLs and endpoints.
Returns:
{
'success': True,
'scan_id': 'uuid',
'base_url': 'https://example.com',
'urls_found': 347,
'endpoints': ['/api/v1/users', '/api/v1/content'],
'parameters': ['id', 'user_id', 'token'],
'forms': [{'action': '/login', 'method': 'POST'}],
'js_files': ['app.min.js', 'vendor.js']
}
"""
```
#### 3. **Active Directory/LDAP Enumeration**
```python
Tool: ldapsearch, enum4linux-ng
Priority: MEDIUM
Reason: Corporate network enumeration
Capabilities:
- LDAP queries
- User enumeration
- Group membership
- Service principal names
- Kerberos enumeration
Implementation Complexity: MEDIUM-HIGH
Expected Function:
async def ldap_enum(
program_id: str,
target: str,
port: int = 389,
auth: Optional[Dict[str, str]] = None,
) -> Dict[str, Any]:
"""
Enumerate Active Directory/LDAP information.
Returns:
{
'success': True,
'scan_id': 'uuid',
'target': 'dc.example.com',
'users': ['user1@example.com', 'user2@example.com'],
'groups': ['Domain Admins', 'IT Support'],
'computers': ['WS001', 'WS002'],
'services': ['HTTP/web01.example.com']
}
"""
```
#### 4. **Network Discovery & Mapping**
```python
Tool: masscan, rustscan
Priority: HIGH
Reason: Faster port scanning for large ranges
Capabilities:
- Ultra-fast port scanning
- IP range scanning
- Banner grabbing
- Service enumeration
- Network topology mapping
Implementation Complexity: LOW
Expected Function:
async def network_scan(
program_id: str,
cidr: str, # e.g., "192.168.1.0/24"
ports: str = "top-100", # or "1-65535"
rate: int = 1000, # packets per second
) -> Dict[str, Any]:
"""
Fast network and port scanning.
Returns:
{
'success': True,
'scan_id': 'uuid',
'cidr': '192.168.1.0/24',
'hosts_found': 45,
'hosts': [
{
'ip': '192.168.1.10',
'open_ports': [22, 80, 443],
'services': {'80': 'http', '443': 'https'}
}
]
}
"""
```
#### 5. **Certificate Transparency Monitoring**
```python
Tool: certstream, crt.sh API
Priority: MEDIUM
Reason: Continuous subdomain discovery
Capabilities:
- Real-time certificate monitoring
- Historical certificate search
- Wildcard certificate discovery
- Subdomain extraction from certs
Implementation Complexity: LOW-MEDIUM
Expected Function:
async def cert_transparency_search(
program_id: str,
domain: str,
days_back: int = 30,
) -> Dict[str, Any]:
"""
Search certificate transparency logs.
Returns:
{
'success': True,
'scan_id': 'uuid',
'domain': 'example.com',
'certificates': [
{
'serial': '0x1234...',
'common_name': '*.example.com',
'san': ['example.com', '*.api.example.com'],
'issued_date': '2025-10-01',
'issuer': 'Let\'s Encrypt'
}
],
'new_subdomains': ['new-api.example.com']
}
"""
```
#### 6. **Cloud Asset Discovery**
```python
Tool: cloud_enum, ScoutSuite
Priority: MEDIUM
Reason: Discover cloud infrastructure
Capabilities:
- AWS S3 bucket enumeration
- Azure blob storage discovery
- Google Cloud Storage finding
- Cloud service enumeration
- Public cloud asset detection
Implementation Complexity: MEDIUM
Expected Function:
async def cloud_asset_enum(
program_id: str,
company_name: str,
clouds: List[str] = ["aws", "azure", "gcp"],
) -> Dict[str, Any]:
"""
Enumerate cloud assets for a company.
Returns:
{
'success': True,
'scan_id': 'uuid',
'company': 'example',
'aws': {
's3_buckets': ['example-backups', 'example-logs'],
'cloudfront': ['d111111abcdef8.cloudfront.net']
},
'azure': {
'blob_storage': ['examplestorage.blob.core.windows.net']
},
'gcp': {
'storage': ['example-bucket.storage.googleapis.com']
}
}
"""
```
#### 7. **Email Harvesting**
```python
Tool: theHarvester, hunter.io API
Priority: LOW-MEDIUM
Reason: OSINT for social engineering
Capabilities:
- Email address discovery
- Employee enumeration
- Social media profiles
- Breach database searches
- Email format detection
Implementation Complexity: LOW
Expected Function:
async def email_harvest(
program_id: str,
domain: str,
sources: List[str] = ["google", "linkedin", "hunter"],
) -> Dict[str, Any]:
"""
Harvest email addresses and employee information.
Returns:
{
'success': True,
'scan_id': 'uuid',
'domain': 'example.com',
'emails': [
'john.doe@example.com',
'jane.smith@example.com'
],
'email_pattern': '{first}.{last}@example.com',
'employees': [
{'name': 'John Doe', 'title': 'Security Engineer', 'source': 'linkedin'}
]
}
"""
```
#### 8. **Screenshot & Visual Recon**
```python
Tool: gowitness, aquatone, eyewitness
Priority: MEDIUM
Reason: Visual asset discovery
Capabilities:
- Automated screenshots
- Visual similarity detection
- Technology detection via visuals
- HTTP response analysis
- Login page detection
Implementation Complexity: MEDIUM
Expected Function:
async def screenshot_recon(
program_id: str,
urls: List[str],
resolution: str = "1440x900",
) -> Dict[str, Any]:
"""
Take screenshots of URLs for visual analysis.
Returns:
{
'success': True,
'scan_id': 'uuid',
'screenshots': [
{
'url': 'https://example.com',
'screenshot_path': '/path/to/screenshot.png',
'http_status': 200,
'title': 'Example Site',
'has_login': True,
'technologies': ['WordPress', 'jQuery']
}
]
}
"""
```
#### 9. **Git & Code Repository Discovery**
```python
Tool: GitDorker, truffleHog
Priority: HIGH
Reason: Find exposed repositories and secrets
Capabilities:
- GitHub repository discovery
- GitLab/Bitbucket enumeration
- Exposed .git directory detection
- Secret scanning
- Code analysis
Implementation Complexity: MEDIUM
Expected Function:
async def git_recon(
program_id: str,
company_name: str,
scan_repos: bool = True,
) -> Dict[str, Any]:
"""
Discover Git repositories and potential secrets.
Returns:
{
'success': True,
'scan_id': 'uuid',
'company': 'example',
'repositories': [
{
'url': 'https://github.com/example/app',
'visibility': 'public',
'last_updated': '2025-10-01'
}
],
'exposed_git': ['https://example.com/.git/'],
'secrets_found': [
{
'type': 'API key',
'location': 'config.js',
'severity': 'high'
}
]
}
"""
```
#### 10. **API Discovery & Enumeration**
```python
Tool: APIFuzzer, Arjun
Priority: HIGH
Reason: Modern apps are API-heavy
Capabilities:
- API endpoint discovery
- GraphQL schema enumeration
- REST API structure mapping
- Parameter discovery
- API documentation finding
Implementation Complexity: MEDIUM
Expected Function:
async def api_discovery(
program_id: str,
base_url: str,
api_type: str = "auto", # rest, graphql, soap, auto
) -> Dict[str, Any]:
"""
Discover and enumerate API endpoints.
Returns:
{
'success': True,
'scan_id': 'uuid',
'base_url': 'https://api.example.com',
'api_type': 'rest',
'endpoints': [
{
'path': '/v1/users',
'methods': ['GET', 'POST'],
'parameters': ['id', 'limit', 'offset'],
'auth_required': True
}
],
'graphql_schema': {
'queries': ['getUser', 'listUsers'],
'mutations': ['createUser', 'deleteUser']
}
}
"""
```
---
## 🛠️ Implementation Guide
### Step 1: Install Required Tools
```bash
# Navigate to project directory
cd /home/orglobal/Dev/bug-bounty-mcp
# Install tools
sudo apt-get update
# Amass
sudo apt-get install -y amass
# Masscan (fast port scanner)
sudo apt-get install -y masscan
# Gospider (web crawler)
go install github.com/jaeles-project/gospider@latest
# Gowitness (screenshots)
go install github.com/sensepost/gowitness@latest
# Cloud_enum
pip install cloud-enum
# TheHarvester
pip install theHarvester
# TruffleHog (secret scanning)
pip install truffleHog
```
### Step 2: Update recon.py with New Tools
```python
# File: src/bugbounty_mcp/tools/recon.py
class ReconTools:
"""Reconnaissance tools for bug bounty hunting."""
# ... existing code ...
async def advanced_subdomain_enum(
self,
program_id: str,
domain: str,
mode: str = "passive",
wordlist: Optional[str] = None,
) -> Dict[str, Any]:
"""Advanced subdomain enumeration using amass."""
# Validate
program = self.config.get_program(program_id)
if not program:
return {'success': False, 'error': f"Program '{program_id}' not found"}
validator = ScopeValidator(program)
is_valid, reason = validator.validate_target(domain)
if not is_valid:
return {'success': False, 'error': f"Target validation failed: {reason}"}
# Create scan record
scan_id = str(uuid.uuid4())
# Build amass command
args = ["enum", "-d", domain]
if mode == "passive":
args.append("-passive")
elif mode == "active":
args.append("-active")
if wordlist:
args.extend(["-brute", "-w", wordlist])
try:
# Execute amass
result = await self.executor.execute(
"amass",
args,
timeout=600, # 10 minutes
)
if result.success:
subdomains = result.output.strip().split('\n')
subdomains = [s.strip() for s in subdomains if s.strip()]
# Filter in-scope
in_scope = validator.filter_in_scope_targets(subdomains)
response = {
'success': True,
'scan_id': scan_id,
'domain': domain,
'subdomains': sorted(in_scope),
'count': len(in_scope),
'method': mode,
}
# Cache results
cache_key = f"amass_{program_id}_{domain}_{mode}"
self.cache.set(cache_key, response, ttl=86400)
return response
else:
return {
'success': False,
'error': 'Amass execution failed',
'details': result.errors,
}
except Exception as e:
logger.error(f"Error in advanced subdomain enum: {str(e)}")
return {'success': False, 'error': str(e)}
async def web_crawl(
self,
program_id: str,
url: str,
depth: int = 3,
max_pages: int = 500,
) -> Dict[str, Any]:
"""Crawl website to discover URLs and endpoints."""
# Validate
program = self.config.get_program(program_id)
if not program:
return {'success': False, 'error': f"Program '{program_id}' not found"}
validator = ScopeValidator(program)
is_valid, reason = validator.validate_target(url)
if not is_valid:
return {'success': False, 'error': f"Target validation failed: {reason}"}
scan_id = str(uuid.uuid4())
# Build gospider command
args = [
"-s", url,
"-d", str(depth),
"-c", str(max_pages),
"--json",
"--sitemap",
"--robots",
]
try:
result = await self.executor.execute(
"gospider",
args,
timeout=600,
)
if result.success:
# Parse JSON output
data = self.parser.parse_gospider_output(result.output)
response = {
'success': True,
'scan_id': scan_id,
'base_url': url,
'urls_found': len(data.get('urls', [])),
'endpoints': data.get('endpoints', []),
'parameters': list(set(data.get('parameters', []))),
'forms': data.get('forms', []),
'js_files': data.get('js_files', []),
}
# Cache
cache_key = f"crawl_{program_id}_{url}"
self.cache.set(cache_key, response, ttl=43200) # 12 hours
return response
else:
return {
'success': False,
'error': 'Web crawl failed',
'details': result.errors,
}
except Exception as e:
logger.error(f"Error in web crawl: {str(e)}")
return {'success': False, 'error': str(e)}
async def network_scan(
self,
program_id: str,
cidr: str,
ports: str = "top-100",
rate: int = 1000,
) -> Dict[str, Any]:
"""Fast network scanning using masscan."""
# Validate
program = self.config.get_program(program_id)
if not program:
return {'success': False, 'error': f"Program '{program_id}' not found"}
validator = ScopeValidator(program)
is_valid, reason = validator.validate_target(cidr)
if not is_valid:
return {'success': False, 'error': f"Target validation failed: {reason}"}
scan_id = str(uuid.uuid4())
# Build masscan command
args = [cidr, "-p", ports, "--rate", str(rate), "--open", "-oJ", "-"]
try:
result = await self.executor.execute(
"masscan",
args,
timeout=900,
requires_root=True, # masscan needs root
)
if result.success:
# Parse JSON output
data = self.parser.parse_masscan_output(result.output)
response = {
'success': True,
'scan_id': scan_id,
'cidr': cidr,
'hosts_found': len(data.get('hosts', [])),
'hosts': data.get('hosts', []),
}
return response
else:
return {
'success': False,
'error': 'Network scan failed',
'details': result.errors,
}
except Exception as e:
logger.error(f"Error in network scan: {str(e)}")
return {'success': False, 'error': str(e)}
```
### Step 3: Register New Tools in server.py
```python
# File: src/bugbounty_mcp/server.py
@app.list_tools()
async def list_tools() -> list[Tool]:
"""List all available tools."""
return [
# ... existing tools ...
Tool(
name="advanced_subdomain_enum",
description="Advanced subdomain enumeration using amass (passive/active/hybrid)",
inputSchema={
"type": "object",
"properties": {
"program_id": {"type": "string", "description": "Program identifier"},
"domain": {"type": "string", "description": "Domain to enumerate"},
"mode": {
"type": "string",
"enum": ["passive", "active", "hybrid"],
"default": "passive",
"description": "Enumeration mode"
},
"wordlist": {"type": "string", "description": "Custom wordlist path (optional)"},
},
"required": ["program_id", "domain"]
}
),
Tool(
name="web_crawl",
description="Crawl website to discover URLs, endpoints, and parameters",
inputSchema={
"type": "object",
"properties": {
"program_id": {"type": "string", "description": "Program identifier"},
"url": {"type": "string", "description": "Base URL to crawl"},
"depth": {"type": "number", "default": 3, "description": "Crawl depth"},
"max_pages": {"type": "number", "default": 500, "description": "Maximum pages to crawl"},
},
"required": ["program_id", "url"]
}
),
Tool(
name="network_scan",
description="Fast network and port scanning using masscan",
inputSchema={
"type": "object",
"properties": {
"program_id": {"type": "string", "description": "Program identifier"},
"cidr": {"type": "string", "description": "CIDR range (e.g., 192.168.1.0/24)"},
"ports": {"type": "string", "default": "top-100", "description": "Ports to scan"},
"rate": {"type": "number", "default": 1000, "description": "Packets per second"},
},
"required": ["program_id", "cidr"]
}
),
# Add more tools here...
]
@app.call_tool()
async def call_tool(name: str, arguments: Any) -> list[TextContent]:
"""Handle tool execution."""
# ... existing tool routing ...
elif name == "advanced_subdomain_enum":
result = await recon_tools.advanced_subdomain_enum(**arguments)
elif name == "web_crawl":
result = await recon_tools.web_crawl(**arguments)
elif name == "network_scan":
result = await recon_tools.network_scan(**arguments)
# ... rest of the code ...
```
### Step 4: Add Parser Methods
```python
# File: src/bugbounty_mcp/utils/parser.py
class OutputParser:
"""Parse output from various security tools."""
# ... existing methods ...
def parse_gospider_output(self, output: str) -> Dict[str, Any]:
"""Parse gospider JSON output."""
import json
urls = []
endpoints = []
parameters = []
forms = []
js_files = []
for line in output.strip().split('\n'):
if not line:
continue
try:
data = json.loads(line)
if data.get('type') == 'url':
urls.append(data.get('url'))
elif data.get('type') == 'form':
forms.append(data)
elif data.get('type') == 'javascript':
js_files.append(data.get('url'))
# Extract parameters from URL
if '?' in data.get('url', ''):
query_string = data['url'].split('?')[1]
params = [p.split('=')[0] for p in query_string.split('&')]
parameters.extend(params)
except json.JSONDecodeError:
continue
return {
'urls': urls,
'endpoints': list(set(endpoints)),
'parameters': parameters,
'forms': forms,
'js_files': js_files,
}
def parse_masscan_output(self, output: str) -> Dict[str, Any]:
"""Parse masscan JSON output."""
import json
hosts = {}
for line in output.strip().split('\n'):
if not line or line.startswith('#'):
continue
try:
data = json.loads(line)
if 'ip' in data and 'ports' in data:
ip = data['ip']
port_data = data['ports'][0]
port = port_data.get('port')
service = port_data.get('service', 'unknown')
if ip not in hosts:
hosts[ip] = {
'ip': ip,
'open_ports': [],
'services': {}
}
hosts[ip]['open_ports'].append(port)
hosts[ip]['services'][str(port)] = service
except json.JSONDecodeError:
continue
return {'hosts': list(hosts.values())}
```
---
## 🔒 Security Considerations
### Important Safeguards:
#### 1. **Scope Validation** (CRITICAL)
```python
# ALWAYS validate before active reconnaissance
validator = ScopeValidator(program)
is_valid, reason = validator.validate_target(target)
if not is_valid:
return {'success': False, 'error': f"Out of scope: {reason}"}
```
#### 2. **Rate Limiting**
```python
# Implement rate limiting for aggressive tools
class RateLimiter:
def __init__(self, max_rate: int = 1000):
self.max_rate = max_rate
self.last_check = time.time()
self.request_count = 0
async def throttle(self):
"""Throttle requests to stay within rate limit."""
current_time = time.time()
elapsed = current_time - self.last_check
if elapsed >= 1.0:
self.request_count = 0
self.last_check = current_time
if self.request_count >= self.max_rate:
sleep_time = 1.0 - elapsed
await asyncio.sleep(sleep_time)
self.request_count = 0
self.last_check = time.time()
self.request_count += 1
```
#### 3. **Audit Logging**
```python
# Log ALL active reconnaissance
self.db.log_audit(AuditLogEntry(
action="network_scan",
program_id=program_id,
target=cidr,
tool="masscan",
success=True,
timestamp=datetime.now(),
details={'ports_scanned': ports, 'rate': rate},
))
```
#### 4. **Privilege Management**
```python
# Some tools require elevated privileges
async def execute_with_sudo(self, tool: str, args: List[str]):
"""Execute tool with sudo (for masscan, etc.)."""
# Check if running in safe environment
if not self.config.allow_privileged_tools:
raise PermissionError("Privileged tools disabled in config")
# Execute with sudo
cmd = ["sudo", tool] + args
result = await self.executor.execute_raw(cmd)
return result
```
#### 5. **Resource Limits**
```python
# Prevent DoS from excessive scanning
MAX_CONCURRENT_SCANS = 3
MAX_SCAN_DURATION = 1800 # 30 minutes
MAX_TARGETS_PER_SCAN = 10000
# Enforce limits
if len(targets) > MAX_TARGETS_PER_SCAN:
return {'success': False, 'error': 'Too many targets'}
```
---
## 📚 Code Examples
### Example 1: Complete Implementation of Web Crawl Tool
```python
# File: src/bugbounty_mcp/tools/active_recon.py
class ActiveReconTools:
"""Advanced active reconnaissance tools."""
def __init__(self, config, db, cache, executor):
self.config = config
self.db = db
self.cache = cache
self.executor = executor
self.rate_limiter = RateLimiter(max_rate=100)
async def comprehensive_web_crawl(
self,
program_id: str,
url: str,
depth: int = 3,
js_analysis: bool = True,
screenshot: bool = False,
) -> Dict[str, Any]:
"""
Comprehensive web crawling with multiple tools.
Combines:
- gospider for general crawling
- katana for JS analysis
- gowitness for screenshots
"""
# Validate
program = self.config.get_program(program_id)
if not program:
return {'success': False, 'error': 'Program not found'}
validator = ScopeValidator(program)
if not validator.validate_target(url)[0]:
return {'success': False, 'error': 'Target out of scope'}
scan_id = str(uuid.uuid4())
results = {
'success': True,
'scan_id': scan_id,
'base_url': url,
}
try:
# Phase 1: Basic crawl with gospider
logger.info(f"Starting basic crawl: {url}")
gospider_result = await self._run_gospider(url, depth)
results['gospider'] = gospider_result
# Phase 2: JS analysis with katana (if enabled)
if js_analysis:
logger.info(f"Starting JS analysis: {url}")
katana_result = await self._run_katana(url, depth)
results['katana'] = katana_result
# Phase 3: Screenshots (if enabled)
if screenshot:
logger.info(f"Taking screenshots: {url}")
discovered_urls = gospider_result.get('urls', [])[:50] # Limit
screenshot_result = await self._run_gowitness(discovered_urls)
results['screenshots'] = screenshot_result
# Combine and deduplicate results
all_urls = set()
all_urls.update(gospider_result.get('urls', []))
if js_analysis:
all_urls.update(katana_result.get('urls', []))
# Filter in-scope
in_scope_urls = validator.filter_in_scope_targets(list(all_urls))
# Extract interesting data
endpoints = self._extract_api_endpoints(in_scope_urls)
parameters = self._extract_parameters(in_scope_urls)
results.update({
'total_urls': len(in_scope_urls),
'urls': sorted(in_scope_urls)[:1000], # Limit output
'api_endpoints': endpoints,
'parameters': list(set(parameters)),
})
# Cache results
cache_key = f"web_crawl_{program_id}_{url}"
self.cache.set(cache_key, results, ttl=43200)
# Audit log
self.db.log_audit(AuditLogEntry(
action="web_crawl",
program_id=program_id,
target=url,
tool="gospider+katana",
success=True,
details={'urls_found': len(in_scope_urls)},
))
return results
except Exception as e:
logger.error(f"Error in web crawl: {str(e)}")
return {
'success': False,
'scan_id': scan_id,
'error': str(e),
}
async def _run_gospider(self, url: str, depth: int) -> Dict[str, Any]:
"""Run gospider crawler."""
args = ["-s", url, "-d", str(depth), "--json"]
result = await self.executor.execute("gospider", args, timeout=300)
if result.success:
return self.parser.parse_gospider_output(result.output)
return {'urls': [], 'error': 'Gospider failed'}
async def _run_katana(self, url: str, depth: int) -> Dict[str, Any]:
"""Run katana for JS-heavy sites."""
args = [
"-u", url,
"-d", str(depth),
"-js-crawl",
"-json",
]
result = await self.executor.execute("katana", args, timeout=300)
if result.success:
return self.parser.parse_katana_output(result.output)
return {'urls': [], 'error': 'Katana failed'}
async def _run_gowitness(self, urls: List[str]) -> Dict[str, Any]:
"""Take screenshots with gowitness."""
# Create temp file with URLs
import tempfile
with tempfile.NamedTemporaryFile(mode='w', delete=False) as f:
f.write('\n'.join(urls))
url_file = f.name
args = [
"file",
"-f", url_file,
"--screenshot-path", "./screenshots",
]
result = await self.executor.execute("gowitness", args, timeout=600)
# Cleanup
os.unlink(url_file)
if result.success:
return {'screenshot_count': len(urls), 'path': './screenshots'}
return {'screenshot_count': 0, 'error': 'Gowitness failed'}
def _extract_api_endpoints(self, urls: List[str]) -> List[str]:
"""Extract potential API endpoints from URLs."""
api_patterns = ['/api/', '/v1/', '/v2/', '/graphql', '/rest/']
endpoints = []
for url in urls:
for pattern in api_patterns:
if pattern in url.lower():
endpoints.append(url)
break
return list(set(endpoints))
def _extract_parameters(self, urls: List[str]) -> List[str]:
"""Extract unique parameters from URLs."""
from urllib.parse import urlparse, parse_qs
parameters = []
for url in urls:
if '?' in url:
parsed = urlparse(url)
params = parse_qs(parsed.query)
parameters.extend(params.keys())
return list(set(parameters))
```
### Example 2: Usage in Assessment
```python
# How to use the new tools in an assessment:
async def run_active_recon(program_id: str, target: str):
"""Run comprehensive active reconnaissance."""
results = {}
# 1. Advanced subdomain enumeration
print("🔍 Running advanced subdomain enumeration...")
subdomains = await mcp_bugbounty_advanced_subdomain_enum(
program_id=program_id,
domain=target,
mode="hybrid", # passive + active
)
results['subdomains'] = subdomains
print(f"Found {subdomains['count']} subdomains")
# 2. Web crawling on main site
print("🕷️ Crawling website...")
crawl_results = await mcp_bugbounty_web_crawl(
program_id=program_id,
url=f"https://{target}",
depth=3,
max_pages=500,
)
results['crawl'] = crawl_results
print(f"Discovered {crawl_results['urls_found']} URLs")
# 3. Network scanning (if IP range in scope)
if has_ip_scope:
print("🌐 Scanning network range...")
network = await mcp_bugbounty_network_scan(
program_id=program_id,
cidr="192.168.1.0/24",
ports="1-1000",
rate=500,
)
results['network'] = network
print(f"Found {network['hosts_found']} active hosts")
# 4. API discovery on discovered endpoints
print("🔌 Discovering APIs...")
api_results = await mcp_bugbounty_api_discovery(
program_id=program_id,
base_url=f"https://api.{target}",
)
results['apis'] = api_results
return results
```
---
## 🎯 Recommended Implementation Priority
### Phase 1 (Week 1-2): Essential Tools
1. ✅ Advanced subdomain enumeration (amass)
2. ✅ Web crawling (gospider/katana)
3. ✅ Fast network scanning (masscan/rustscan)
### Phase 2 (Week 3-4): Enhanced Capabilities
4. ✅ API discovery and enumeration
5. ✅ Screenshot/visual recon
6. ✅ Certificate transparency monitoring
### Phase 3 (Month 2): Advanced Features
7. ✅ Cloud asset discovery
8. ✅ Git repository scanning
9. ✅ Email harvesting
10. ✅ LDAP/Active Directory enumeration
---
## 📞 Getting Started
### Quick Start:
```bash
# 1. Install a new tool (example: amass)
go install -v github.com/OWASP/Amass/v3/...@master
# 2. Add to recon.py
# Copy the code examples above
# 3. Register in server.py
# Add tool definition and routing
# 4. Test it
python -m bugbounty_mcp.server
# 5. Use it via MCP
mcp_bugbounty_advanced_subdomain_enum(
program_id="test",
domain="example.com",
mode="passive"
)
```
---
**Yes, active recon tools can and should be added to the MCP server! This guide provides everything you need to implement them safely and effectively.** 🚀
Which tools would you like me to help implement first?