mitmproxy-mcp MCP Server
by lucasoeth
- mitmproxy-mcp
- src
- mitmproxy_mcp
import re
from typing import Any, Dict, List
# Known bot protection systems and their signatures
BOT_PROTECTION_SIGNATURES = {
"Cloudflare": [
r"cf-ray", # Cloudflare Ray ID header
r"__cf_bm", # Cloudflare Bot Management cookie
r"cf_clearance", # Cloudflare challenge clearance cookie
r"\"why_captcha\"", # Common in Cloudflare challenge responses
r"challenge-platform", # Used in challenge scripts
r"turnstile\.js", # Cloudflare Turnstile
],
"Akamai Bot Manager": [
r"_abck=", # Akamai Bot Manager cookie
r"akam_", # Akamai cookie prefix
r"bm_sz", # Bot Manager cookie
r"sensor_data", # Bot detection data
],
"PerimeterX": [
r"_px\d?=", # PerimeterX cookies
r"px\.js", # PerimeterX script
r"px-captcha", # PerimeterX captcha
],
"DataDome": [
r"datadome=", # DataDome cookie
r"datadome\.js", # DataDome script
r"_dd_s", # DataDome session cookie
],
"reCAPTCHA": [
r"google\.com/recaptcha",
r"recaptcha\.net",
r"g-recaptcha",
],
"hCaptcha": [
r"hcaptcha\.com",
r"h-captcha",
],
"Generic Bot Detection": [
r"bot=", # Generic bot cookie
r"captcha", # Generic captcha reference
r"challenge", # Generic challenge term
r"detected automated traffic", # Common message
r"verify you are human", # Common message
]
}
def extract_javascript(html_content: str) -> List[Dict[str, Any]]:
"""
Extract JavaScript from HTML content and provide basic analysis.
Returns list of dictionaries with script info.
"""
scripts = []
# Extract inline scripts
inline_pattern = r'<script[^>]*>(.*?)</script>'
inline_scripts = re.findall(inline_pattern, html_content, re.DOTALL)
for i, script in enumerate(inline_scripts):
if len(script.strip()) > 0:
script_info = {
"type": "inline",
"index": i,
"size": len(script),
"content": script if len(script) < 1000 else script[:1000] + "... [truncated]",
"summary": analyze_script(script)
}
scripts.append(script_info)
# Extract external script references
src_pattern = r'<script[^>]*src=[\'"]([^\'"]+)[\'"][^>]*>'
external_scripts = re.findall(src_pattern, html_content)
for i, src in enumerate(external_scripts):
script_info = {
"type": "external",
"index": i,
"src": src,
"suspicious": any(term in src.lower() for term in [
"captcha", "challenge", "bot", "protect", "security",
"verify", "check", "shield", "defend", "guard"
])
}
scripts.append(script_info)
return scripts
def analyze_script(script: str) -> Dict[str, Any]:
"""
Analyze JavaScript content for common protection patterns.
"""
analysis = {
"potential_protection": False,
"fingerprinting_indicators": [],
"token_generation_indicators": [],
"obfuscation_level": "none",
"key_functions": []
}
# Check for fingerprinting techniques
fingerprinting_patterns = [
(r'navigator\.', "Browser navigator object"),
(r'screen\.', "Screen properties"),
(r'canvas', "Canvas fingerprinting"),
(r'webgl', "WebGL fingerprinting"),
(r'font', "Font enumeration"),
(r'audio', "Audio fingerprinting"),
(r'plugins', "Plugin enumeration"),
(r'User-Agent', "User-Agent checking"),
(r'platform', "Platform detection")
]
for pattern, description in fingerprinting_patterns:
if re.search(pattern, script, re.IGNORECASE):
analysis["fingerprinting_indicators"].append(description)
# Check for token generation
token_patterns = [
(r'(token|captcha|challenge|clearance)', "Token/challenge reference"),
(r'(generate|calculate|compute)', "Computation terms"),
(r'(Math\.random|crypto)', "Random generation"),
(r'(cookie|setCookie|document\.cookie)', "Cookie manipulation"),
(r'(xhr|XMLHttpRequest|fetch)', "Request sending")
]
for pattern, description in token_patterns:
if re.search(pattern, script, re.IGNORECASE):
analysis["token_generation_indicators"].append(description)
# Check for common obfuscation techniques
if len(re.findall(r'eval\(', script)) > 3:
analysis["obfuscation_level"] = "high"
elif len(re.findall(r'\\x[0-9a-f]{2}', script)) > 10:
analysis["obfuscation_level"] = "high"
elif len(re.findall(r'String\.fromCharCode', script)) > 3:
analysis["obfuscation_level"] = "high"
elif re.search(r'function\(\w{1,2},\w{1,2},\w{1,2}\)\{', script):
analysis["obfuscation_level"] = "medium"
elif sum(1 for c in script if c == ';') > len(script) / 10:
analysis["obfuscation_level"] = "medium"
elif sum(len(w) > 30 for w in re.findall(r'\w+', script)) > 10:
analysis["obfuscation_level"] = "medium"
# Extract potential key function names
function_pattern = r'function\s+(\w+)\s*\('
functions = re.findall(function_pattern, script)
suspicious_terms = ["challenge", "token", "captcha", "verify", "bot", "check", "security"]
for func in functions:
if any(term in func.lower() for term in suspicious_terms):
analysis["key_functions"].append(func)
# Determine if this is potentially protection-related
analysis["potential_protection"] = (
len(analysis["fingerprinting_indicators"]) > 2 or
len(analysis["token_generation_indicators"]) > 2 or
analysis["obfuscation_level"] != "none" or
len(analysis["key_functions"]) > 0
)
return analysis
def analyze_cookies(headers: Dict[str, str]) -> List[Dict[str, Any]]:
"""
Analyze cookies for common protection-related patterns.
"""
cookie_header = headers.get("Cookie", "") or headers.get("Set-Cookie", "")
if not cookie_header:
return []
# Split multiple cookies
cookies = []
for cookie_str in cookie_header.split(";"):
parts = cookie_str.strip().split("=", 1)
if len(parts) == 2:
name, value = parts
cookie = {
"name": name.strip(),
"value": value.strip() if len(value.strip()) < 50 else value.strip()[:50] + "... [truncated]",
"protection_related": False,
"vendor": "unknown"
}
# Check if this is a known protection cookie
for vendor, signatures in BOT_PROTECTION_SIGNATURES.items():
for sig in signatures:
if re.search(sig, name, re.IGNORECASE):
cookie["protection_related"] = True
cookie["vendor"] = vendor
break
if cookie["protection_related"]:
break
cookies.append(cookie)
return cookies
def identify_protection_system(flow) -> List[Dict[str, Any]]:
"""
Identify potential bot protection systems based on signatures.
"""
protections = []
# Combine all searchable content
searchable_content = ""
# Add request headers
for k, v in flow.request.headers.items():
searchable_content += f"{k}: {v}\n"
# Check response if available
if flow.response:
# Add response headers
for k, v in flow.response.headers.items():
searchable_content += f"{k}: {v}\n"
# Add response content if it's text
content_type = flow.response.headers.get("Content-Type", "")
if "text" in content_type or "javascript" in content_type or "json" in content_type:
try:
searchable_content += flow.response.content.decode('utf-8', errors='ignore')
except Exception:
pass
# Check for protection signatures
for vendor, signatures in BOT_PROTECTION_SIGNATURES.items():
matches = []
for sig in signatures:
if re.search(sig, searchable_content, re.IGNORECASE):
matches.append(sig)
if matches:
protections.append({
"vendor": vendor,
"confidence": len(matches) / len(signatures) * 100,
"matching_signatures": matches
})
return sorted(protections, key=lambda x: x["confidence"], reverse=True)
def analyze_response_for_challenge(flow) -> Dict[str, Any]:
"""
Analyze a response to determine if it contains a challenge.
"""
if not flow.response:
return {"is_challenge": False}
result = {
"is_challenge": False,
"challenge_indicators": [],
"status_code": flow.response.status_code,
"challenge_type": "unknown"
}
# Check status code
if flow.response.status_code in [403, 429, 503]:
result["challenge_indicators"].append(f"Suspicious status code: {flow.response.status_code}")
# Check for challenge headers
challenge_headers = {
"cf-mitigated": "Cloudflare mitigation",
"cf-chl-bypass": "Cloudflare challenge bypass",
"x-datadome": "DataDome protection",
"x-px": "PerimeterX",
"x-amz-captcha": "AWS WAF Captcha"
}
for header, description in challenge_headers.items():
if any(h.lower() == header.lower() for h in flow.response.headers.keys()):
result["challenge_indicators"].append(f"Challenge header: {description}")
# Check for challenge content patterns
content = flow.response.content.decode('utf-8', errors='ignore')
challenge_patterns = [
(r'captcha', "CAPTCHA"),
(r'challenge', "Challenge term"),
(r'blocked', "Blocking message"),
(r'verify.*human', "Human verification"),
(r'suspicious.*activity', "Suspicious activity message"),
(r'security.*check', "Security check message"),
(r'ddos', "DDoS protection message"),
(r'automated.*request', "Automated request detection")
]
for pattern, description in challenge_patterns:
if re.search(pattern, content, re.IGNORECASE):
result["challenge_indicators"].append(f"Content indicator: {description}")
# Determine if this is a challenge response
result["is_challenge"] = len(result["challenge_indicators"]) > 0
# Determine challenge type
if "CAPTCHA" in " ".join(result["challenge_indicators"]):
result["challenge_type"] = "captcha"
elif "JavaScript" in content and result["is_challenge"]:
result["challenge_type"] = "javascript"
elif result["is_challenge"]:
result["challenge_type"] = "other"
return result
def generate_suggestions(analysis: Dict[str, Any]) -> List[str]:
"""
Generate remediation suggestions based on the protection analysis.
"""
suggestions = []
# Check if any protection system was detected
if analysis.get("protection_systems"):
top_system = analysis["protection_systems"][0]["vendor"]
confidence = analysis["protection_systems"][0]["confidence"]
if confidence > 50:
suggestions.append(f"Detected {top_system} with {confidence:.1f}% confidence.")
# Add vendor-specific suggestions
if "Cloudflare" in top_system:
suggestions.append("Cloudflare often uses JavaScript challenges. Check for cf_clearance cookie.")
suggestions.append("Consider using proven techniques like cfscrape or cloudscraper libraries.")
elif "Akamai" in top_system:
suggestions.append("Akamai uses sensor_data for browser fingerprinting.")
suggestions.append("Focus on _abck cookie which contains browser verification data.")
elif "PerimeterX" in top_system:
suggestions.append("PerimeterX relies on JavaScript execution and browser fingerprinting.")
suggestions.append("Look for _px cookies which are essential for session validation.")
elif "DataDome" in top_system:
suggestions.append("DataDome uses advanced behavioral and fingerprinting techniques.")
suggestions.append("The datadome cookie is critical for maintaining sessions.")
elif "CAPTCHA" in top_system:
suggestions.append("This site uses CAPTCHA challenges which may require manual solving or specialized services.")
# Add suggestions based on challenge type
if analysis.get("challenge_analysis", {}).get("is_challenge", False):
challenge_type = analysis.get("challenge_analysis", {}).get("challenge_type", "unknown")
if challenge_type == "javascript":
suggestions.append("This response contains a JavaScript challenge that must be solved.")
suggestions.append("Consider using a headless browser to execute the challenge JavaScript.")
# If we have script analysis, add more specific suggestions
if "scripts" in analysis:
obfuscated_scripts = [s for s in analysis["scripts"] if s.get("summary", {}).get("obfuscation_level") in ["medium", "high"]]
if obfuscated_scripts:
suggestions.append(f"Found {len(obfuscated_scripts)} obfuscated script(s) that likely contain challenge logic.")
fingerprinting_scripts = [s for s in analysis["scripts"] if s.get("summary", {}).get("fingerprinting_indicators")]
if fingerprinting_scripts:
techniques = set()
for script in fingerprinting_scripts:
techniques.update(script.get("summary", {}).get("fingerprinting_indicators", []))
suggestions.append(f"Detected browser fingerprinting techniques: {', '.join(techniques)}.")
elif challenge_type == "captcha":
suggestions.append("This response contains a CAPTCHA challenge.")
suggestions.append("Consider using a CAPTCHA solving service or manual intervention.")
# Check for important cookies
protection_cookies = [c for c in analysis.get("response_cookies", []) if c.get("protection_related")]
if protection_cookies:
cookie_names = [c["name"] for c in protection_cookies]
suggestions.append(f"Important protection cookies to maintain: {', '.join(cookie_names)}.")
# General suggestions
if analysis.get("protection_systems") or analysis.get("challenge_analysis", {}).get("is_challenge", False):
suggestions.append("General recommendations:")
suggestions.append("- Maintain consistent User-Agent between requests")
suggestions.append("- Preserve all cookies from the session")
suggestions.append("- Add appropriate referer and origin headers")
suggestions.append("- Consider adding delays between requests to avoid rate limiting")
suggestions.append("- Use rotating IP addresses if available")
return suggestions