mitmproxy-mcp MCP Server

import re from typing import Any, Dict, List # Known bot protection systems and their signatures BOT_PROTECTION_SIGNATURES = { "Cloudflare": [ r"cf-ray", # Cloudflare Ray ID header r"__cf_bm", # Cloudflare Bot Management cookie r"cf_clearance", # Cloudflare challenge clearance cookie r"\"why_captcha\"", # Common in Cloudflare challenge responses r"challenge-platform", # Used in challenge scripts r"turnstile\.js", # Cloudflare Turnstile ], "Akamai Bot Manager": [ r"_abck=", # Akamai Bot Manager cookie r"akam_", # Akamai cookie prefix r"bm_sz", # Bot Manager cookie r"sensor_data", # Bot detection data ], "PerimeterX": [ r"_px\d?=", # PerimeterX cookies r"px\.js", # PerimeterX script r"px-captcha", # PerimeterX captcha ], "DataDome": [ r"datadome=", # DataDome cookie r"datadome\.js", # DataDome script r"_dd_s", # DataDome session cookie ], "reCAPTCHA": [ r"google\.com/recaptcha", r"recaptcha\.net", r"g-recaptcha", ], "hCaptcha": [ r"hcaptcha\.com", r"h-captcha", ], "Generic Bot Detection": [ r"bot=", # Generic bot cookie r"captcha", # Generic captcha reference r"challenge", # Generic challenge term r"detected automated traffic", # Common message r"verify you are human", # Common message ] } def extract_javascript(html_content: str) -> List[Dict[str, Any]]: """ Extract JavaScript from HTML content and provide basic analysis. Returns list of dictionaries with script info. """ scripts = [] # Extract inline scripts inline_pattern = r'<script[^>]*>(.*?)</script>' inline_scripts = re.findall(inline_pattern, html_content, re.DOTALL) for i, script in enumerate(inline_scripts): if len(script.strip()) > 0: script_info = { "type": "inline", "index": i, "size": len(script), "content": script if len(script) < 1000 else script[:1000] + "... [truncated]", "summary": analyze_script(script) } scripts.append(script_info) # Extract external script references src_pattern = r'<script[^>]*src=[\'"]([^\'"]+)[\'"][^>]*>' external_scripts = re.findall(src_pattern, html_content) for i, src in enumerate(external_scripts): script_info = { "type": "external", "index": i, "src": src, "suspicious": any(term in src.lower() for term in [ "captcha", "challenge", "bot", "protect", "security", "verify", "check", "shield", "defend", "guard" ]) } scripts.append(script_info) return scripts def analyze_script(script: str) -> Dict[str, Any]: """ Analyze JavaScript content for common protection patterns. """ analysis = { "potential_protection": False, "fingerprinting_indicators": [], "token_generation_indicators": [], "obfuscation_level": "none", "key_functions": [] } # Check for fingerprinting techniques fingerprinting_patterns = [ (r'navigator\.', "Browser navigator object"), (r'screen\.', "Screen properties"), (r'canvas', "Canvas fingerprinting"), (r'webgl', "WebGL fingerprinting"), (r'font', "Font enumeration"), (r'audio', "Audio fingerprinting"), (r'plugins', "Plugin enumeration"), (r'User-Agent', "User-Agent checking"), (r'platform', "Platform detection") ] for pattern, description in fingerprinting_patterns: if re.search(pattern, script, re.IGNORECASE): analysis["fingerprinting_indicators"].append(description) # Check for token generation token_patterns = [ (r'(token|captcha|challenge|clearance)', "Token/challenge reference"), (r'(generate|calculate|compute)', "Computation terms"), (r'(Math\.random|crypto)', "Random generation"), (r'(cookie|setCookie|document\.cookie)', "Cookie manipulation"), (r'(xhr|XMLHttpRequest|fetch)', "Request sending") ] for pattern, description in token_patterns: if re.search(pattern, script, re.IGNORECASE): analysis["token_generation_indicators"].append(description) # Check for common obfuscation techniques if len(re.findall(r'eval\(', script)) > 3: analysis["obfuscation_level"] = "high" elif len(re.findall(r'\\x[0-9a-f]{2}', script)) > 10: analysis["obfuscation_level"] = "high" elif len(re.findall(r'String\.fromCharCode', script)) > 3: analysis["obfuscation_level"] = "high" elif re.search(r'function\(\w{1,2},\w{1,2},\w{1,2}\)\{', script): analysis["obfuscation_level"] = "medium" elif sum(1 for c in script if c == ';') > len(script) / 10: analysis["obfuscation_level"] = "medium" elif sum(len(w) > 30 for w in re.findall(r'\w+', script)) > 10: analysis["obfuscation_level"] = "medium" # Extract potential key function names function_pattern = r'function\s+(\w+)\s*\(' functions = re.findall(function_pattern, script) suspicious_terms = ["challenge", "token", "captcha", "verify", "bot", "check", "security"] for func in functions: if any(term in func.lower() for term in suspicious_terms): analysis["key_functions"].append(func) # Determine if this is potentially protection-related analysis["potential_protection"] = ( len(analysis["fingerprinting_indicators"]) > 2 or len(analysis["token_generation_indicators"]) > 2 or analysis["obfuscation_level"] != "none" or len(analysis["key_functions"]) > 0 ) return analysis def analyze_cookies(headers: Dict[str, str]) -> List[Dict[str, Any]]: """ Analyze cookies for common protection-related patterns. """ cookie_header = headers.get("Cookie", "") or headers.get("Set-Cookie", "") if not cookie_header: return [] # Split multiple cookies cookies = [] for cookie_str in cookie_header.split(";"): parts = cookie_str.strip().split("=", 1) if len(parts) == 2: name, value = parts cookie = { "name": name.strip(), "value": value.strip() if len(value.strip()) < 50 else value.strip()[:50] + "... [truncated]", "protection_related": False, "vendor": "unknown" } # Check if this is a known protection cookie for vendor, signatures in BOT_PROTECTION_SIGNATURES.items(): for sig in signatures: if re.search(sig, name, re.IGNORECASE): cookie["protection_related"] = True cookie["vendor"] = vendor break if cookie["protection_related"]: break cookies.append(cookie) return cookies def identify_protection_system(flow) -> List[Dict[str, Any]]: """ Identify potential bot protection systems based on signatures. """ protections = [] # Combine all searchable content searchable_content = "" # Add request headers for k, v in flow.request.headers.items(): searchable_content += f"{k}: {v}\n" # Check response if available if flow.response: # Add response headers for k, v in flow.response.headers.items(): searchable_content += f"{k}: {v}\n" # Add response content if it's text content_type = flow.response.headers.get("Content-Type", "") if "text" in content_type or "javascript" in content_type or "json" in content_type: try: searchable_content += flow.response.content.decode('utf-8', errors='ignore') except Exception: pass # Check for protection signatures for vendor, signatures in BOT_PROTECTION_SIGNATURES.items(): matches = [] for sig in signatures: if re.search(sig, searchable_content, re.IGNORECASE): matches.append(sig) if matches: protections.append({ "vendor": vendor, "confidence": len(matches) / len(signatures) * 100, "matching_signatures": matches }) return sorted(protections, key=lambda x: x["confidence"], reverse=True) def analyze_response_for_challenge(flow) -> Dict[str, Any]: """ Analyze a response to determine if it contains a challenge. """ if not flow.response: return {"is_challenge": False} result = { "is_challenge": False, "challenge_indicators": [], "status_code": flow.response.status_code, "challenge_type": "unknown" } # Check status code if flow.response.status_code in [403, 429, 503]: result["challenge_indicators"].append(f"Suspicious status code: {flow.response.status_code}") # Check for challenge headers challenge_headers = { "cf-mitigated": "Cloudflare mitigation", "cf-chl-bypass": "Cloudflare challenge bypass", "x-datadome": "DataDome protection", "x-px": "PerimeterX", "x-amz-captcha": "AWS WAF Captcha" } for header, description in challenge_headers.items(): if any(h.lower() == header.lower() for h in flow.response.headers.keys()): result["challenge_indicators"].append(f"Challenge header: {description}") # Check for challenge content patterns content = flow.response.content.decode('utf-8', errors='ignore') challenge_patterns = [ (r'captcha', "CAPTCHA"), (r'challenge', "Challenge term"), (r'blocked', "Blocking message"), (r'verify.*human', "Human verification"), (r'suspicious.*activity', "Suspicious activity message"), (r'security.*check', "Security check message"), (r'ddos', "DDoS protection message"), (r'automated.*request', "Automated request detection") ] for pattern, description in challenge_patterns: if re.search(pattern, content, re.IGNORECASE): result["challenge_indicators"].append(f"Content indicator: {description}") # Determine if this is a challenge response result["is_challenge"] = len(result["challenge_indicators"]) > 0 # Determine challenge type if "CAPTCHA" in " ".join(result["challenge_indicators"]): result["challenge_type"] = "captcha" elif "JavaScript" in content and result["is_challenge"]: result["challenge_type"] = "javascript" elif result["is_challenge"]: result["challenge_type"] = "other" return result def generate_suggestions(analysis: Dict[str, Any]) -> List[str]: """ Generate remediation suggestions based on the protection analysis. """ suggestions = [] # Check if any protection system was detected if analysis.get("protection_systems"): top_system = analysis["protection_systems"][0]["vendor"] confidence = analysis["protection_systems"][0]["confidence"] if confidence > 50: suggestions.append(f"Detected {top_system} with {confidence:.1f}% confidence.") # Add vendor-specific suggestions if "Cloudflare" in top_system: suggestions.append("Cloudflare often uses JavaScript challenges. Check for cf_clearance cookie.") suggestions.append("Consider using proven techniques like cfscrape or cloudscraper libraries.") elif "Akamai" in top_system: suggestions.append("Akamai uses sensor_data for browser fingerprinting.") suggestions.append("Focus on _abck cookie which contains browser verification data.") elif "PerimeterX" in top_system: suggestions.append("PerimeterX relies on JavaScript execution and browser fingerprinting.") suggestions.append("Look for _px cookies which are essential for session validation.") elif "DataDome" in top_system: suggestions.append("DataDome uses advanced behavioral and fingerprinting techniques.") suggestions.append("The datadome cookie is critical for maintaining sessions.") elif "CAPTCHA" in top_system: suggestions.append("This site uses CAPTCHA challenges which may require manual solving or specialized services.") # Add suggestions based on challenge type if analysis.get("challenge_analysis", {}).get("is_challenge", False): challenge_type = analysis.get("challenge_analysis", {}).get("challenge_type", "unknown") if challenge_type == "javascript": suggestions.append("This response contains a JavaScript challenge that must be solved.") suggestions.append("Consider using a headless browser to execute the challenge JavaScript.") # If we have script analysis, add more specific suggestions if "scripts" in analysis: obfuscated_scripts = [s for s in analysis["scripts"] if s.get("summary", {}).get("obfuscation_level") in ["medium", "high"]] if obfuscated_scripts: suggestions.append(f"Found {len(obfuscated_scripts)} obfuscated script(s) that likely contain challenge logic.") fingerprinting_scripts = [s for s in analysis["scripts"] if s.get("summary", {}).get("fingerprinting_indicators")] if fingerprinting_scripts: techniques = set() for script in fingerprinting_scripts: techniques.update(script.get("summary", {}).get("fingerprinting_indicators", [])) suggestions.append(f"Detected browser fingerprinting techniques: {', '.join(techniques)}.") elif challenge_type == "captcha": suggestions.append("This response contains a CAPTCHA challenge.") suggestions.append("Consider using a CAPTCHA solving service or manual intervention.") # Check for important cookies protection_cookies = [c for c in analysis.get("response_cookies", []) if c.get("protection_related")] if protection_cookies: cookie_names = [c["name"] for c in protection_cookies] suggestions.append(f"Important protection cookies to maintain: {', '.join(cookie_names)}.") # General suggestions if analysis.get("protection_systems") or analysis.get("challenge_analysis", {}).get("is_challenge", False): suggestions.append("General recommendations:") suggestions.append("- Maintain consistent User-Agent between requests") suggestions.append("- Preserve all cookies from the session") suggestions.append("- Add appropriate referer and origin headers") suggestions.append("- Consider adding delays between requests to avoid rate limiting") suggestions.append("- Use rotating IP addresses if available") return suggestions