"""Traffic interception and analysis using mitmproxy."""
import asyncio
import json
import uuid
from pathlib import Path
from typing import Dict, Any, Optional, List
import logging
from ..config import ConfigManager
from ..storage.database import DatabaseManager
from ..storage.cache import CacheManager
from ..utils.executor import ToolExecutor
from ..utils.validators import ScopeValidator
logger = logging.getLogger(__name__)
class ProxyTools:
"""Traffic interception and analysis tools using mitmproxy."""
def __init__(
self,
config: ConfigManager,
db: DatabaseManager,
cache: CacheManager,
executor: ToolExecutor,
):
"""Initialize proxy tools.
Args:
config: Configuration manager
db: Database manager
cache: Cache manager
executor: Tool executor
"""
self.config = config
self.db = db
self.cache = cache
self.executor = executor
async def start_traffic_intercept(
self,
program_id: str,
listen_host: str = "127.0.0.1",
listen_port: int = 8080,
mode: str = "regular",
save_flows: bool = True,
filter_pattern: Optional[str] = None,
) -> Dict[str, Any]:
"""Start mitmproxy/mitmdump for traffic interception.
Args:
program_id: Program identifier
listen_host: Host to listen on
listen_port: Port to listen on
mode: Mode (regular, transparent, reverse, upstream, socks5)
save_flows: Whether to save captured flows
filter_pattern: Optional filter pattern for traffic
Returns:
Dictionary with proxy server details and instructions
"""
# Validate program
program = self.config.get_program(program_id)
if not program:
return {'success': False, 'error': f"Program '{program_id}' not found"}
session_id = str(uuid.uuid4())[:8]
# Create output directory
output_dir = Path(f"./data/proxy_sessions/{program_id}/{session_id}")
output_dir.mkdir(parents=True, exist_ok=True)
flow_file = output_dir / "flows.mitm" if save_flows else None
# Build mitmdump command
args = [
"--listen-host", listen_host,
"--listen-port", str(listen_port),
"--set", f"mode={mode}",
]
if save_flows:
args.extend(["-w", str(flow_file)])
if filter_pattern:
args.extend(["--set", f"flow_filter={filter_pattern}"])
# Add script for automatic logging
script_path = self._create_logging_script(output_dir)
args.extend(["-s", str(script_path)])
logger.info(f"Starting mitmdump session {session_id} for {program_id}")
return {
'success': True,
'session_id': session_id,
'proxy_url': f"http://{listen_host}:{listen_port}",
'listen_host': listen_host,
'listen_port': listen_port,
'output_dir': str(output_dir),
'flow_file': str(flow_file) if flow_file else None,
'instructions': {
'manual_start': f"mitmdump {' '.join(args)}",
'configure_browser': f"Set HTTP/HTTPS proxy to {listen_host}:{listen_port}",
'certificate': "Install mitmproxy CA cert from ~/.mitmproxy/mitmproxy-ca-cert.pem",
'stop': "Press Ctrl+C to stop the proxy",
},
'note': "This returns configuration. Start mitmdump manually or use background mode."
}
async def analyze_traffic_flows(
self,
program_id: str,
flow_file: str,
target_filter: Optional[str] = None,
) -> Dict[str, Any]:
"""Analyze captured traffic flows from mitmdump.
Args:
program_id: Program identifier
flow_file: Path to .mitm flow file
target_filter: Optional filter for specific targets
Returns:
Dictionary with analysis results
"""
# Validate program
program = self.config.get_program(program_id)
if not program:
return {'success': False, 'error': f"Program '{program_id}' not found"}
flow_path = Path(flow_file)
if not flow_path.exists():
return {'success': False, 'error': f"Flow file not found: {flow_file}"}
# Create analysis script
analysis_script = self._create_analysis_script(target_filter)
# Run mitmdump in reading mode with analysis script
args = [
"-nr", str(flow_path), # Read flows without starting proxy
"-s", str(analysis_script),
]
result = await self.executor.execute("mitmdump", args, timeout=300)
if result.success:
# Parse analysis output
analysis_data = self._parse_analysis_output(result.output)
return {
'success': True,
'flow_file': str(flow_file),
'analysis': analysis_data,
'total_requests': analysis_data.get('total_requests', 0),
'unique_hosts': analysis_data.get('unique_hosts', []),
'endpoints_found': analysis_data.get('endpoints', []),
'parameters_found': analysis_data.get('parameters', []),
'cookies_found': analysis_data.get('cookies', []),
'api_keys_found': analysis_data.get('api_keys', []),
}
else:
return {
'success': False,
'error': 'Failed to analyze traffic flows',
'details': result.errors,
}
async def extract_api_endpoints(
self,
program_id: str,
flow_file: str,
) -> Dict[str, Any]:
"""Extract API endpoints from captured traffic.
Args:
program_id: Program identifier
flow_file: Path to .mitm flow file
Returns:
Dictionary with extracted API endpoints
"""
# Validate program
program = self.config.get_program(program_id)
if not program:
return {'success': False, 'error': f"Program '{program_id}' not found"}
validator = ScopeValidator(program)
flow_path = Path(flow_file)
if not flow_path.exists():
return {'success': False, 'error': f"Flow file not found: {flow_file}"}
# Create endpoint extraction script
extraction_script = self._create_endpoint_extraction_script()
args = [
"-nr", str(flow_path),
"-s", str(extraction_script),
]
result = await self.executor.execute("mitmdump", args, timeout=300)
if result.success:
endpoints = self._parse_endpoints(result.output)
# Filter to in-scope endpoints
in_scope_endpoints = []
out_scope_endpoints = []
for endpoint in endpoints:
is_valid, _ = validator.validate_url(endpoint['url'])
if is_valid:
in_scope_endpoints.append(endpoint)
else:
out_scope_endpoints.append(endpoint)
return {
'success': True,
'total_endpoints': len(endpoints),
'in_scope_endpoints': in_scope_endpoints,
'out_scope_endpoints': out_scope_endpoints,
'in_scope_count': len(in_scope_endpoints),
'out_scope_count': len(out_scope_endpoints),
}
else:
return {
'success': False,
'error': 'Failed to extract endpoints',
'details': result.errors,
}
def _create_logging_script(self, output_dir: Path) -> Path:
"""Create a mitmproxy script for logging traffic.
Args:
output_dir: Directory to save logs
Returns:
Path to created script
"""
script_path = output_dir / "logger.py"
script_content = '''"""Mitmproxy logging script."""
import json
from mitmproxy import http
class TrafficLogger:
def __init__(self):
self.log_file = open("''' + str(output_dir / "traffic.log") + '''", "w")
def request(self, flow: http.HTTPFlow) -> None:
"""Log HTTP request."""
log_entry = {
"type": "request",
"method": flow.request.method,
"url": flow.request.pretty_url,
"host": flow.request.host,
"path": flow.request.path,
"headers": dict(flow.request.headers),
"timestamp": flow.request.timestamp_start,
}
self.log_file.write(json.dumps(log_entry) + "\\n")
self.log_file.flush()
def response(self, flow: http.HTTPFlow) -> None:
"""Log HTTP response."""
log_entry = {
"type": "response",
"url": flow.request.pretty_url,
"status_code": flow.response.status_code,
"headers": dict(flow.response.headers),
"timestamp": flow.response.timestamp_end,
}
self.log_file.write(json.dumps(log_entry) + "\\n")
self.log_file.flush()
addons = [TrafficLogger()]
'''
with open(script_path, 'w') as f:
f.write(script_content)
return script_path
def _create_analysis_script(self, target_filter: Optional[str]) -> Path:
"""Create a mitmproxy script for traffic analysis.
Args:
target_filter: Optional filter for specific targets
Returns:
Path to created script
"""
script_path = Path("./cache/analysis_script.py")
script_path.parent.mkdir(parents=True, exist_ok=True)
filter_code = f'if "{target_filter}" not in flow.request.pretty_url:\n return' if target_filter else ''
script_content = f'''"""Mitmproxy analysis script."""
import json
import re
from mitmproxy import http
from collections import defaultdict
class TrafficAnalyzer:
def __init__(self):
self.requests = []
self.hosts = set()
self.endpoints = set()
self.parameters = set()
self.cookies = set()
self.api_keys = []
def request(self, flow: http.HTTPFlow) -> None:
"""Analyze HTTP request."""
{filter_code}
self.requests.append(flow.request.pretty_url)
self.hosts.add(flow.request.host)
self.endpoints.add(f"{{flow.request.method}} {{flow.request.path}}")
# Extract parameters
for param in flow.request.query.keys():
self.parameters.add(param)
# Extract cookies
if "cookie" in flow.request.headers:
cookies = flow.request.headers["cookie"].split(";")
for cookie in cookies:
if "=" in cookie:
self.cookies.add(cookie.split("=")[0].strip())
# Look for API keys in headers
for header, value in flow.request.headers.items():
if any(key in header.lower() for key in ["api", "key", "token", "auth"]):
self.api_keys.append({{
"header": header,
"value": value[:20] + "..." if len(value) > 20 else value,
"url": flow.request.pretty_url
}})
def done(self):
"""Print analysis results."""
results = {{
"total_requests": len(self.requests),
"unique_hosts": sorted(list(self.hosts)),
"endpoints": sorted(list(self.endpoints)),
"parameters": sorted(list(self.parameters)),
"cookies": sorted(list(self.cookies)),
"api_keys": self.api_keys,
}}
print(json.dumps(results, indent=2))
addons = [TrafficAnalyzer()]
'''
with open(script_path, 'w') as f:
f.write(script_content)
return script_path
def _create_endpoint_extraction_script(self) -> Path:
"""Create a mitmproxy script for extracting API endpoints.
Returns:
Path to created script
"""
script_path = Path("./cache/endpoint_extraction.py")
script_path.parent.mkdir(parents=True, exist_ok=True)
script_content = '''"""Mitmproxy endpoint extraction script."""
import json
from mitmproxy import http
class EndpointExtractor:
def __init__(self):
self.endpoints = []
def request(self, flow: http.HTTPFlow) -> None:
"""Extract endpoint information."""
endpoint = {
"method": flow.request.method,
"url": flow.request.pretty_url,
"host": flow.request.host,
"path": flow.request.path,
"parameters": list(flow.request.query.keys()),
"has_body": len(flow.request.content) > 0,
}
# Check if it looks like an API endpoint
is_api = any([
"/api/" in flow.request.path,
"/v1/" in flow.request.path,
"/v2/" in flow.request.path,
flow.request.path.endswith(".json"),
"application/json" in flow.request.headers.get("content-type", ""),
])
endpoint["is_api"] = is_api
self.endpoints.append(endpoint)
def done(self):
"""Print extracted endpoints."""
for endpoint in self.endpoints:
print(json.dumps(endpoint))
addons = [EndpointExtractor()]
'''
with open(script_path, 'w') as f:
f.write(script_content)
return script_path
def _parse_analysis_output(self, output: str) -> Dict[str, Any]:
"""Parse analysis script output.
Args:
output: Raw output from analysis script
Returns:
Parsed analysis data
"""
try:
# The output should be JSON from the analysis script
return json.loads(output)
except json.JSONDecodeError:
return {
'total_requests': 0,
'unique_hosts': [],
'endpoints': [],
'parameters': [],
'cookies': [],
'api_keys': [],
'parse_error': 'Failed to parse analysis output',
'raw_output': output[:500],
}
def _parse_endpoints(self, output: str) -> List[Dict[str, Any]]:
"""Parse endpoint extraction output.
Args:
output: Raw output from endpoint extraction script
Returns:
List of extracted endpoints
"""
endpoints = []
for line in output.strip().split('\n'):
if line:
try:
endpoint = json.loads(line)
endpoints.append(endpoint)
except json.JSONDecodeError:
continue
return endpoints