"""Helper utilities for Bug Bounty Hunter MCP"""
import asyncio
import subprocess
import json
import os
from typing import Optional, Dict, List, Any
from pathlib import Path
import hashlib
import time
from datetime import datetime
async def run_command(
command: str,
timeout: int = 300,
shell: bool = True,
capture_output: bool = True
) -> Dict[str, Any]:
"""
Run shell command asynchronously
Args:
command: Command to execute
timeout: Timeout in seconds
shell: Use shell
capture_output: Capture stdout/stderr
Returns:
Dictionary with returncode, stdout, stderr
"""
try:
if shell:
process = await asyncio.create_subprocess_shell(
command,
stdout=asyncio.subprocess.PIPE if capture_output else None,
stderr=asyncio.subprocess.PIPE if capture_output else None
)
else:
process = await asyncio.create_subprocess_exec(
*command.split(),
stdout=asyncio.subprocess.PIPE if capture_output else None,
stderr=asyncio.subprocess.PIPE if capture_output else None
)
stdout, stderr = await asyncio.wait_for(
process.communicate(),
timeout=timeout
)
return {
"returncode": process.returncode,
"stdout": stdout.decode('utf-8', errors='ignore') if stdout else "",
"stderr": stderr.decode('utf-8', errors='ignore') if stderr else "",
"success": process.returncode == 0
}
except asyncio.TimeoutError:
process.kill()
return {
"returncode": -1,
"stdout": "",
"stderr": f"Command timed out after {timeout} seconds",
"success": False
}
except Exception as e:
return {
"returncode": -1,
"stdout": "",
"stderr": str(e),
"success": False
}
def parse_output(output: str, format: str = "lines") -> Any:
"""
Parse command output
Args:
output: Output string
format: Output format (lines, json, csv)
Returns:
Parsed output
"""
if format == "lines":
return [line.strip() for line in output.split('\n') if line.strip()]
elif format == "json":
try:
return json.loads(output)
except json.JSONDecodeError:
return {"error": "Invalid JSON", "raw": output}
elif format == "csv":
lines = output.strip().split('\n')
if not lines:
return []
header = lines[0].split(',')
data = []
for line in lines[1:]:
values = line.split(',')
if len(values) == len(header):
data.append(dict(zip(header, values)))
return data
return output
def save_results(
results: Dict,
output_dir: str,
filename: str,
format: str = "json"
) -> str:
"""
Save results to file
Args:
results: Results dictionary
output_dir: Output directory
filename: Filename
format: Output format (json, txt)
Returns:
Path to saved file
"""
# Create output directory if it doesn't exist
Path(output_dir).mkdir(parents=True, exist_ok=True)
# Generate filepath
filepath = os.path.join(output_dir, filename)
if format == "json":
with open(filepath, 'w') as f:
json.dump(results, f, indent=2)
elif format == "txt":
with open(filepath, 'w') as f:
if isinstance(results, dict):
for key, value in results.items():
f.write(f"{key}: {value}\n")
elif isinstance(results, list):
for item in results:
f.write(f"{item}\n")
else:
f.write(str(results))
return filepath
def generate_hash(text: str, algorithm: str = "md5") -> str:
"""
Generate hash of text
Args:
text: Text to hash
algorithm: Hash algorithm (md5, sha1, sha256)
Returns:
Hash string
"""
if algorithm == "md5":
return hashlib.md5(text.encode()).hexdigest()
elif algorithm == "sha1":
return hashlib.sha1(text.encode()).hexdigest()
elif algorithm == "sha256":
return hashlib.sha256(text.encode()).hexdigest()
else:
return hashlib.md5(text.encode()).hexdigest()
def get_timestamp() -> str:
"""
Get current timestamp
Returns:
Timestamp string (YYYY-MM-DD_HH-MM-SS)
"""
return datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
def deduplicate_list(items: List[Any]) -> List[Any]:
"""
Remove duplicates from list while preserving order
Args:
items: List of items
Returns:
Deduplicated list
"""
seen = set()
result = []
for item in items:
if item not in seen:
seen.add(item)
result.append(item)
return result
def merge_results(results_list: List[Dict]) -> Dict:
"""
Merge multiple result dictionaries
Args:
results_list: List of result dictionaries
Returns:
Merged dictionary
"""
merged = {}
for results in results_list:
for key, value in results.items():
if key not in merged:
merged[key] = value
elif isinstance(value, list) and isinstance(merged[key], list):
merged[key].extend(value)
merged[key] = deduplicate_list(merged[key])
elif isinstance(value, dict) and isinstance(merged[key], dict):
merged[key].update(value)
else:
merged[key] = value
return merged
def format_size(size_bytes: int) -> str:
"""
Format bytes to human-readable size
Args:
size_bytes: Size in bytes
Returns:
Formatted string (e.g., "1.5 MB")
"""
for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
if size_bytes < 1024.0:
return f"{size_bytes:.1f} {unit}"
size_bytes /= 1024.0
return f"{size_bytes:.1f} PB"
def rate_limit_delay(requests_per_second: int) -> float:
"""
Calculate delay between requests for rate limiting
Args:
requests_per_second: Target requests per second
Returns:
Delay in seconds
"""
if requests_per_second <= 0:
return 0
return 1.0 / requests_per_second
async def rate_limited_task(
task_func,
items: List,
rate_limit: int = 150,
concurrent: int = 50
) -> List:
"""
Execute tasks with rate limiting and concurrency control
Args:
task_func: Async function to execute for each item
items: List of items to process
rate_limit: Requests per second
concurrent: Maximum concurrent tasks
Returns:
List of results
"""
semaphore = asyncio.Semaphore(concurrent)
delay = rate_limit_delay(rate_limit)
async def limited_task(item):
async with semaphore:
result = await task_func(item)
if delay > 0:
await asyncio.sleep(delay)
return result
tasks = [limited_task(item) for item in items]
return await asyncio.gather(*tasks, return_exceptions=True)
def load_wordlist(wordlist_path: str, limit: Optional[int] = None) -> List[str]:
"""
Load wordlist from file
Args:
wordlist_path: Path to wordlist file
limit: Maximum number of lines to load
Returns:
List of words
"""
if not os.path.exists(wordlist_path):
return []
words = []
with open(wordlist_path, 'r', encoding='utf-8', errors='ignore') as f:
for i, line in enumerate(f):
if limit and i >= limit:
break
word = line.strip()
if word and not word.startswith('#'):
words.append(word)
return words
def create_output_directory(base_dir: str, target: str) -> str:
"""
Create output directory for target
Args:
base_dir: Base output directory
target: Target name
Returns:
Created directory path
"""
timestamp = get_timestamp()
safe_target = target.replace('://', '_').replace('/', '_').replace('.', '_')
output_dir = os.path.join(base_dir, f"{safe_target}_{timestamp}")
Path(output_dir).mkdir(parents=True, exist_ok=True)
return output_dir
def filter_by_status_code(
results: List[Dict],
include_status: Optional[List[int]] = None,
exclude_status: Optional[List[int]] = None
) -> List[Dict]:
"""
Filter results by HTTP status code
Args:
results: List of result dictionaries
include_status: Status codes to include
exclude_status: Status codes to exclude
Returns:
Filtered results
"""
filtered = []
for result in results:
status = result.get('status_code', 0)
if include_status and status not in include_status:
continue
if exclude_status and status in exclude_status:
continue
filtered.append(result)
return filtered
def calculate_scan_time(start_time: float) -> str:
"""
Calculate scan duration
Args:
start_time: Start time from time.time()
Returns:
Formatted duration string
"""
duration = time.time() - start_time
hours = int(duration // 3600)
minutes = int((duration % 3600) // 60)
seconds = int(duration % 60)
if hours > 0:
return f"{hours}h {minutes}m {seconds}s"
elif minutes > 0:
return f"{minutes}m {seconds}s"
else:
return f"{seconds}s"