mitmproxy-mcp MCP Server

import asyncio import os import json import re import base64 from typing import Any, Dict, List, Optional, Union, Tuple from mitmproxy import io from mcp.server.models import InitializationOptions import mcp.types as types from mcp.server import NotificationOptions, Server import mcp.server.stdio # Directory where mitmproxy dump files are stored DUMP_DIR = "/Users/lucas/Coding/mitmproxy-mcp/dumps" server = Server("mitmproxy-mcp") # Cache for storing flows per session FLOW_CACHE = {} # Maximum content size in bytes before switching to structure preview MAX_CONTENT_SIZE = 2000 # Known bot protection systems and their signatures BOT_PROTECTION_SIGNATURES = { "Cloudflare": [ r"cf-ray", # Cloudflare Ray ID header r"__cf_bm", # Cloudflare Bot Management cookie r"cf_clearance", # Cloudflare challenge clearance cookie r"\"why_captcha\"", # Common in Cloudflare challenge responses r"challenge-platform", # Used in challenge scripts r"turnstile\.js", # Cloudflare Turnstile ], "Akamai Bot Manager": [ r"_abck=", # Akamai Bot Manager cookie r"akam_", # Akamai cookie prefix r"bm_sz", # Bot Manager cookie r"sensor_data", # Bot detection data ], "PerimeterX": [ r"_px\d?=", # PerimeterX cookies r"px\.js", # PerimeterX script r"px-captcha", # PerimeterX captcha ], "DataDome": [ r"datadome=", # DataDome cookie r"datadome\.js", # DataDome script r"_dd_s", # DataDome session cookie ], "reCAPTCHA": [ r"google\.com/recaptcha", r"recaptcha\.net", r"g-recaptcha", ], "hCaptcha": [ r"hcaptcha\.com", r"h-captcha", ], "Generic Bot Detection": [ r"bot=", # Generic bot cookie r"captcha", # Generic captcha reference r"challenge", # Generic challenge term r"detected automated traffic", # Common message r"verify you are human", # Common message ] } @server.list_tools() async def handle_list_tools() -> list[types.Tool]: """ List available tools. Each tool specifies its arguments using JSON Schema validation. """ return [ types.Tool( name="list_flows", description="Retrieves detailed HTTP request/response data including headers, content (or structure preview for large JSON), and metadata from specified flows", inputSchema={ "type": "object", "properties": { "session_id": { "type": "string", "description": "The ID of the session to list flows from" } }, "required": ["session_id"] } ), types.Tool( name="get_flow_details", description="Lists HTTP requests/responses from a mitmproxy capture session, showing method, URL, and status codes", inputSchema={ "type": "object", "properties": { "session_id": { "type": "string", "description": "The ID of the session" }, "flow_indexes": { "type": "array", "items": { "type": "integer" }, "description": "The indexes of the flows" }, "include_content": { "type": "boolean", "description": "Whether to include full content in the response (default: true)", "default": True } }, "required": ["session_id", "flow_indexes"] } ), types.Tool( name="extract_json_fields", description="Extract specific fields from JSON content in a flow using JSONPath expressions", inputSchema={ "type": "object", "properties": { "session_id": { "type": "string", "description": "The ID of the session" }, "flow_index": { "type": "integer", "description": "The index of the flow" }, "content_type": { "type": "string", "enum": ["request", "response"], "description": "Whether to extract from request or response content" }, "json_paths": { "type": "array", "items": { "type": "string" }, "description": "JSONPath expressions to extract (e.g. ['$.data.users', '$.metadata.timestamp'])" } }, "required": ["session_id", "flow_index", "content_type", "json_paths"] } ), types.Tool( name="analyze_protection", description="Analyze flow for bot protection mechanisms and extract challenge details", inputSchema={ "type": "object", "properties": { "session_id": { "type": "string", "description": "The ID of the session" }, "flow_index": { "type": "integer", "description": "The index of the flow to analyze" }, "extract_scripts": { "type": "boolean", "description": "Whether to extract and analyze JavaScript from the response (default: true)", "default": True } }, "required": ["session_id", "flow_index"] } ) ] async def get_flows_from_dump(session_id: str) -> list: """ Retrieves flows from the dump file, using the cache if available. """ dump_file = os.path.join(DUMP_DIR, f"{session_id}.dump") if not os.path.exists(dump_file): raise FileNotFoundError("Session not found") if session_id in FLOW_CACHE: return FLOW_CACHE[session_id] else: with open(dump_file, "rb") as f: reader = io.FlowReader(f) flows = list(reader.stream()) FLOW_CACHE[session_id] = flows return flows async def list_flows(arguments: dict) -> list[types.TextContent]: """ Lists HTTP flows from a mitmproxy dump file. """ session_id = arguments.get("session_id") if not session_id: return [types.TextContent(type="text", text="Error: Missing session_id")] try: flows = await get_flows_from_dump(session_id) flow_list = [] for i, flow in enumerate(flows): if flow.type == "http": request = flow.request response = flow.response flow_info = { "index": i, "method": request.method, "url": request.url, "status": response.status_code if response else None } flow_list.append(flow_info) return [types.TextContent(type="text", text=json.dumps(flow_list, indent=2))] except FileNotFoundError: return [types.TextContent(type="text", text="Error: Session not found")] except Exception as e: return [types.TextContent(type="text", text=f"Error reading flows: {str(e)}")] def generate_json_structure(json_data: Any, max_depth: int = 2, current_depth: int = 0) -> Any: """ Generate a simplified structure of JSON content, showing keys and types but replacing actual values with type indicators after a certain depth. """ if current_depth >= max_depth: if isinstance(json_data, dict): return {"...": f"{len(json_data)} keys"} elif isinstance(json_data, list): return f"[{len(json_data)} items]" else: return f"({type(json_data).__name__})" if isinstance(json_data, dict): result = {} for key, value in json_data.items(): result[key] = generate_json_structure(value, max_depth, current_depth + 1) return result elif isinstance(json_data, list): if not json_data: return [] # For lists, show structure of first item and count sample = generate_json_structure(json_data[0], max_depth, current_depth + 1) return [sample, f"... ({len(json_data)-1} more items)"] if len(json_data) > 1 else [sample] else: return f"({type(json_data).__name__})" def parse_json_content(content: bytes, headers: dict) -> Union[Dict, str, bytes]: """ Attempts to parse content as JSON if the content type indicates JSON. Returns the parsed JSON or the raw content if parsing fails. """ content_type = headers.get("Content-Type", "").lower() if headers else "" if "application/json" in content_type or "text/json" in content_type: try: return json.loads(content.decode(errors="ignore")) except json.JSONDecodeError: return content.decode(errors="ignore") return content.decode(errors="ignore") def extract_with_jsonpath(json_data: Any, path: str) -> Any: """ Basic implementation of JSONPath extraction. Supports simple dot notation and array indexing. For more complex cases, consider using a full JSONPath library. """ # Handle root object reference if path == "$": return json_data # Strip leading $ if present if path.startswith("$"): path = path[1:] if path.startswith("."): path = path[1:] parts = [] # Parse the path - handle both dot notation and brackets current = "" in_brackets = False for char in path: if char == "[": if current: parts.append(current) current = "" in_brackets = True elif char == "]": if in_brackets: try: # Handle array index parts.append(int(current.strip())) except ValueError: # Handle quoted key quoted = current.strip() if (quoted.startswith("'") and quoted.endswith("'")) or \ (quoted.startswith('"') and quoted.endswith('"')): parts.append(quoted[1:-1]) else: parts.append(quoted) current = "" in_brackets = False elif char == "." and not in_brackets: if current: parts.append(current) current = "" else: current += char if current: parts.append(current) # Navigate through the data result = json_data for part in parts: try: if isinstance(result, dict): result = result.get(part) elif isinstance(result, list) and isinstance(part, int): if 0 <= part < len(result): result = result[part] else: return None else: return None if result is None: break except Exception: return None return result async def get_flow_details(arguments: dict) -> list[types.TextContent]: """ Gets details of specific flows from a mitmproxy dump file. For large JSON content, returns structure preview instead of full content. """ session_id = arguments.get("session_id") flow_indexes = arguments.get("flow_indexes") include_content = arguments.get("include_content", True) if not session_id: return [types.TextContent(type="text", text="Error: Missing session_id")] if not flow_indexes: return [types.TextContent(type="text", text="Error: Missing flow_indexes")] try: flows = await get_flows_from_dump(session_id) flow_details_list = [] for flow_index in flow_indexes: try: flow = flows[flow_index] if flow.type == "http": request = flow.request response = flow.response # Parse content request_content = parse_json_content(request.content, dict(request.headers)) response_content = None if response: response_content = parse_json_content(response.content, dict(response.headers)) # Handle large content request_content_preview = None response_content_preview = None flow_details = {} # Check if request content is large and is JSON if include_content and len(request.content) > MAX_CONTENT_SIZE and isinstance(request_content, dict): request_content_preview = generate_json_structure(request_content) request_content = None # Don't include full content elif include_content and len(request.content) > MAX_CONTENT_SIZE: if isinstance(request_content, str): request_content = request_content[:MAX_CONTENT_SIZE] + " ...[truncated]" else: request_content = request_content[:MAX_CONTENT_SIZE].decode(errors="ignore") + " ...[truncated]" flow_details["request_content_note"] = f"Request content truncated to {MAX_CONTENT_SIZE} bytes." # Check if response content is large and is JSON if response and include_content and len(response.content) > MAX_CONTENT_SIZE and isinstance(response_content, dict): response_content_preview = generate_json_structure(response_content) response_content = None # Don't include full content elif response and include_content and len(response.content) > MAX_CONTENT_SIZE: if isinstance(response_content, str): response_content = response_content[:MAX_CONTENT_SIZE] + " ...[truncated]" else: response_content = response_content[:MAX_CONTENT_SIZE].decode(errors="ignore") + " ...[truncated]" flow_details["response_content_note"] = f"Response content truncated to {MAX_CONTENT_SIZE} bytes." # Build flow details flow_details.update( { "index": flow_index, "method": request.method, "url": request.url, "request_headers": dict(request.headers), "status": response.status_code if response else None, "response_headers": dict(response.headers) if response else None, }) # Add content or previews based on size if include_content: if request_content is not None: flow_details["request_content"] = request_content if request_content_preview is not None: flow_details["request_content_preview"] = request_content_preview flow_details["request_content_size"] = len(request.content) flow_details["request_content_note"] = "Content too large to display. Use extract_json_fields tool to get specific values." if response_content is not None: flow_details["response_content"] = response_content if response_content_preview is not None: flow_details["response_content_preview"] = response_content_preview flow_details["response_content_size"] = len(response.content) if response else 0 flow_details["response_content_note"] = "Content too large to display. Use extract_json_fields tool to get specific values." flow_details_list.append(flow_details) else: flow_details_list.append({"error": f"Flow {flow_index} is not an HTTP flow"}) except IndexError: flow_details_list.append({"error": f"Flow index {flow_index} out of range"}) return [types.TextContent(type="text", text=json.dumps(flow_details_list, indent=2))] except FileNotFoundError: return [types.TextContent(type="text", text="Error: Session not found")] except Exception as e: return [types.TextContent(type="text", text=f"Error reading flow details: {str(e)}")] async def extract_json_fields(arguments: dict) -> list[types.TextContent]: """ Extract specific fields from JSON content in a flow using JSONPath expressions. """ session_id = arguments.get("session_id") flow_index = arguments.get("flow_index") content_type = arguments.get("content_type") json_paths = arguments.get("json_paths") if not session_id: return [types.TextContent(type="text", text="Error: Missing session_id")] if flow_index is None: return [types.TextContent(type="text", text="Error: Missing flow_index")] if not content_type: return [types.TextContent(type="text", text="Error: Missing content_type")] if not json_paths: return [types.TextContent(type="text", text="Error: Missing json_paths")] try: flows = await get_flows_from_dump(session_id) try: flow = flows[flow_index] if flow.type != "http": return [types.TextContent(type="text", text=f"Error: Flow {flow_index} is not an HTTP flow")] request = flow.request response = flow.response # Determine which content to extract from content = None headers = None if content_type == "request": content = request.content headers = dict(request.headers) elif content_type == "response": if not response: return [types.TextContent(type="text", text=f"Error: Flow {flow_index} has no response")] content = response.content headers = dict(response.headers) else: return [types.TextContent(type="text", text=f"Error: Invalid content_type. Must be 'request' or 'response'")] # Parse the content json_content = parse_json_content(content, headers) # Only extract from JSON content if not isinstance(json_content, (dict, list)): return [types.TextContent(type="text", text=f"Error: The {content_type} content is not valid JSON")] # Extract fields result = {} for path in json_paths: try: extracted = extract_with_jsonpath(json_content, path) result[path] = extracted except Exception as e: result[path] = f"Error extracting path: {str(e)}" return [types.TextContent(type="text", text=json.dumps(result, indent=2))] except IndexError: return [types.TextContent(type="text", text=f"Error: Flow index {flow_index} out of range")] except FileNotFoundError: return [types.TextContent(type="text", text="Error: Session not found")] except Exception as e: return [types.TextContent(type="text", text=f"Error extracting JSON fields: {str(e)}")] def extract_javascript(html_content: str) -> List[Dict[str, Any]]: """ Extract JavaScript from HTML content and provide basic analysis. Returns list of dictionaries with script info. """ scripts = [] # Extract inline scripts inline_pattern = r'<script[^>]*>(.*?)</script>' inline_scripts = re.findall(inline_pattern, html_content, re.DOTALL) for i, script in enumerate(inline_scripts): if len(script.strip()) > 0: script_info = { "type": "inline", "index": i, "size": len(script), "content": script if len(script) < 1000 else script[:1000] + "... [truncated]", "summary": analyze_script(script) } scripts.append(script_info) # Extract external script references src_pattern = r'<script[^>]*src=[\'"]([^\'"]+)[\'"][^>]*>' external_scripts = re.findall(src_pattern, html_content) for i, src in enumerate(external_scripts): script_info = { "type": "external", "index": i, "src": src, "suspicious": any(term in src.lower() for term in [ "captcha", "challenge", "bot", "protect", "security", "verify", "check", "shield", "defend", "guard" ]) } scripts.append(script_info) return scripts def analyze_script(script: str) -> Dict[str, Any]: """ Analyze JavaScript content for common protection patterns. """ analysis = { "potential_protection": False, "fingerprinting_indicators": [], "token_generation_indicators": [], "obfuscation_level": "none", "key_functions": [] } # Check for fingerprinting techniques fingerprinting_patterns = [ (r'navigator\.', "Browser navigator object"), (r'screen\.', "Screen properties"), (r'canvas', "Canvas fingerprinting"), (r'webgl', "WebGL fingerprinting"), (r'font', "Font enumeration"), (r'audio', "Audio fingerprinting"), (r'plugins', "Plugin enumeration"), (r'User-Agent', "User-Agent checking"), (r'platform', "Platform detection") ] for pattern, description in fingerprinting_patterns: if re.search(pattern, script, re.IGNORECASE): analysis["fingerprinting_indicators"].append(description) # Check for token generation token_patterns = [ (r'(token|captcha|challenge|clearance)', "Token/challenge reference"), (r'(generate|calculate|compute)', "Computation terms"), (r'(Math\.random|crypto)', "Random generation"), (r'(cookie|setCookie|document\.cookie)', "Cookie manipulation"), (r'(xhr|XMLHttpRequest|fetch)', "Request sending") ] for pattern, description in token_patterns: if re.search(pattern, script, re.IGNORECASE): analysis["token_generation_indicators"].append(description) # Check for common obfuscation techniques if len(re.findall(r'eval\(', script)) > 3: analysis["obfuscation_level"] = "high" elif len(re.findall(r'\\x[0-9a-f]{2}', script)) > 10: analysis["obfuscation_level"] = "high" elif len(re.findall(r'String\.fromCharCode', script)) > 3: analysis["obfuscation_level"] = "high" elif re.search(r'function\(\w{1,2},\w{1,2},\w{1,2}\)\{', script): analysis["obfuscation_level"] = "medium" elif sum(1 for c in script if c == ';') > len(script) / 10: analysis["obfuscation_level"] = "medium" elif sum(len(w) > 30 for w in re.findall(r'\w+', script)) > 10: analysis["obfuscation_level"] = "medium" # Extract potential key function names function_pattern = r'function\s+(\w+)\s*\(' functions = re.findall(function_pattern, script) suspicious_terms = ["challenge", "token", "captcha", "verify", "bot", "check", "security"] for func in functions: if any(term in func.lower() for term in suspicious_terms): analysis["key_functions"].append(func) # Determine if this is potentially protection-related analysis["potential_protection"] = ( len(analysis["fingerprinting_indicators"]) > 2 or len(analysis["token_generation_indicators"]) > 2 or analysis["obfuscation_level"] != "none" or len(analysis["key_functions"]) > 0 ) return analysis def analyze_cookies(headers: Dict[str, str]) -> List[Dict[str, Any]]: """ Analyze cookies for common protection-related patterns. """ cookie_header = headers.get("Cookie", "") or headers.get("Set-Cookie", "") if not cookie_header: return [] # Split multiple cookies cookies = [] for cookie_str in cookie_header.split(";"): parts = cookie_str.strip().split("=", 1) if len(parts) == 2: name, value = parts cookie = { "name": name.strip(), "value": value.strip() if len(value.strip()) < 50 else value.strip()[:50] + "... [truncated]", "protection_related": False, "vendor": "unknown" } # Check if this is a known protection cookie for vendor, signatures in BOT_PROTECTION_SIGNATURES.items(): for sig in signatures: if re.search(sig, name, re.IGNORECASE): cookie["protection_related"] = True cookie["vendor"] = vendor break if cookie["protection_related"]: break cookies.append(cookie) return cookies def identify_protection_system(flow) -> List[Dict[str, Any]]: """ Identify potential bot protection systems based on signatures. """ protections = [] # Combine all searchable content searchable_content = "" # Add request headers for k, v in flow.request.headers.items(): searchable_content += f"{k}: {v}\n" # Check response if available if flow.response: # Add response headers for k, v in flow.response.headers.items(): searchable_content += f"{k}: {v}\n" # Add response content if it's text content_type = flow.response.headers.get("Content-Type", "") if "text" in content_type or "javascript" in content_type or "json" in content_type: try: searchable_content += flow.response.content.decode('utf-8', errors='ignore') except Exception: pass # Check for protection signatures for vendor, signatures in BOT_PROTECTION_SIGNATURES.items(): matches = [] for sig in signatures: if re.search(sig, searchable_content, re.IGNORECASE): matches.append(sig) if matches: protections.append({ "vendor": vendor, "confidence": len(matches) / len(signatures) * 100, "matching_signatures": matches }) return sorted(protections, key=lambda x: x["confidence"], reverse=True) def analyze_response_for_challenge(flow) -> Dict[str, Any]: """ Analyze a response to determine if it contains a challenge. """ if not flow.response: return {"is_challenge": False} result = { "is_challenge": False, "challenge_indicators": [], "status_code": flow.response.status_code, "challenge_type": "unknown" } # Check status code if flow.response.status_code in [403, 429, 503]: result["challenge_indicators"].append(f"Suspicious status code: {flow.response.status_code}") # Check for challenge headers challenge_headers = { "cf-mitigated": "Cloudflare mitigation", "cf-chl-bypass": "Cloudflare challenge bypass", "x-datadome": "DataDome protection", "x-px": "PerimeterX", "x-amz-captcha": "AWS WAF Captcha" } for header, description in challenge_headers.items(): if any(h.lower() == header.lower() for h in flow.response.headers.keys()): result["challenge_indicators"].append(f"Challenge header: {description}") # Check for challenge content patterns content = flow.response.content.decode('utf-8', errors='ignore') challenge_patterns = [ (r'captcha', "CAPTCHA"), (r'challenge', "Challenge term"), (r'blocked', "Blocking message"), (r'verify.*human', "Human verification"), (r'suspicious.*activity', "Suspicious activity message"), (r'security.*check', "Security check message"), (r'ddos', "DDoS protection message"), (r'automated.*request', "Automated request detection") ] for pattern, description in challenge_patterns: if re.search(pattern, content, re.IGNORECASE): result["challenge_indicators"].append(f"Content indicator: {description}") # Determine if this is a challenge response result["is_challenge"] = len(result["challenge_indicators"]) > 0 # Determine challenge type if "CAPTCHA" in " ".join(result["challenge_indicators"]): result["challenge_type"] = "captcha" elif "JavaScript" in content and result["is_challenge"]: result["challenge_type"] = "javascript" elif result["is_challenge"]: result["challenge_type"] = "other" return result async def analyze_protection(arguments: dict) -> list[types.TextContent]: """ Analyze a flow for bot protection mechanisms and extract challenge details. """ session_id = arguments.get("session_id") flow_index = arguments.get("flow_index") extract_scripts = arguments.get("extract_scripts", True) if not session_id: return [types.TextContent(type="text", text="Error: Missing session_id")] if flow_index is None: return [types.TextContent(type="text", text="Error: Missing flow_index")] try: flows = await get_flows_from_dump(session_id) try: flow = flows[flow_index] if flow.type != "http": return [types.TextContent(type="text", text=f"Error: Flow {flow_index} is not an HTTP flow")] # Analyze the flow for protection mechanisms analysis = { "flow_index": flow_index, "method": flow.request.method, "url": flow.request.url, "protection_systems": identify_protection_system(flow), "request_cookies": analyze_cookies(dict(flow.request.headers)), "has_response": flow.response is not None, } if flow.response: # Add response analysis content_type = flow.response.headers.get("Content-Type", "") is_html = "text/html" in content_type analysis.update({ "status_code": flow.response.status_code, "response_cookies": analyze_cookies(dict(flow.response.headers)), "challenge_analysis": analyze_response_for_challenge(flow), "content_type": content_type, "is_html": is_html, }) # If HTML and script extraction is requested, extract and analyze JavaScript if is_html and extract_scripts: try: html_content = flow.response.content.decode('utf-8', errors='ignore') analysis["scripts"] = extract_javascript(html_content) except Exception as e: analysis["script_extraction_error"] = str(e) # Add remediation suggestions based on findings analysis["suggestions"] = generate_suggestions(analysis) return [types.TextContent(type="text", text=json.dumps(analysis, indent=2))] except IndexError: return [types.TextContent(type="text", text=f"Error: Flow index {flow_index} out of range")] except FileNotFoundError: return [types.TextContent(type="text", text="Error: Session not found")] except Exception as e: return [types.TextContent(type="text", text=f"Error analyzing protection: {str(e)}")] def generate_suggestions(analysis: Dict[str, Any]) -> List[str]: """ Generate remediation suggestions based on the protection analysis. """ suggestions = [] # Check if any protection system was detected if analysis.get("protection_systems"): top_system = analysis["protection_systems"][0]["vendor"] confidence = analysis["protection_systems"][0]["confidence"] if confidence > 50: suggestions.append(f"Detected {top_system} with {confidence:.1f}% confidence.") # Add vendor-specific suggestions if "Cloudflare" in top_system: suggestions.append("Cloudflare often uses JavaScript challenges. Check for cf_clearance cookie.") suggestions.append("Consider using proven techniques like cfscrape or cloudscraper libraries.") elif "Akamai" in top_system: suggestions.append("Akamai uses sensor_data for browser fingerprinting.") suggestions.append("Focus on _abck cookie which contains browser verification data.") elif "PerimeterX" in top_system: suggestions.append("PerimeterX relies on JavaScript execution and browser fingerprinting.") suggestions.append("Look for _px cookies which are essential for session validation.") elif "DataDome" in top_system: suggestions.append("DataDome uses advanced behavioral and fingerprinting techniques.") suggestions.append("The datadome cookie is critical for maintaining sessions.") elif "CAPTCHA" in top_system: suggestions.append("This site uses CAPTCHA challenges which may require manual solving or specialized services.") # Add suggestions based on challenge type if analysis.get("challenge_analysis", {}).get("is_challenge", False): challenge_type = analysis.get("challenge_analysis", {}).get("challenge_type", "unknown") if challenge_type == "javascript": suggestions.append("This response contains a JavaScript challenge that must be solved.") suggestions.append("Consider using a headless browser to execute the challenge JavaScript.") # If we have script analysis, add more specific suggestions if "scripts" in analysis: obfuscated_scripts = [s for s in analysis["scripts"] if s.get("summary", {}).get("obfuscation_level") in ["medium", "high"]] if obfuscated_scripts: suggestions.append(f"Found {len(obfuscated_scripts)} obfuscated script(s) that likely contain challenge logic.") fingerprinting_scripts = [s for s in analysis["scripts"] if s.get("summary", {}).get("fingerprinting_indicators")] if fingerprinting_scripts: techniques = set() for script in fingerprinting_scripts: techniques.update(script.get("summary", {}).get("fingerprinting_indicators", [])) suggestions.append(f"Detected browser fingerprinting techniques: {', '.join(techniques)}.") elif challenge_type == "captcha": suggestions.append("This response contains a CAPTCHA challenge.") suggestions.append("Consider using a CAPTCHA solving service or manual intervention.") # Check for important cookies protection_cookies = [c for c in analysis.get("response_cookies", []) if c.get("protection_related")] if protection_cookies: cookie_names = [c["name"] for c in protection_cookies] suggestions.append(f"Important protection cookies to maintain: {', '.join(cookie_names)}.") # General suggestions if analysis.get("protection_systems") or analysis.get("challenge_analysis", {}).get("is_challenge", False): suggestions.append("General recommendations:") suggestions.append("- Maintain consistent User-Agent between requests") suggestions.append("- Preserve all cookies from the session") suggestions.append("- Add appropriate referer and origin headers") suggestions.append("- Consider adding delays between requests to avoid rate limiting") suggestions.append("- Use rotating IP addresses if available") return suggestions @server.call_tool() async def handle_call_tool( name: str, arguments: dict | None ) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]: """ Handle tool execution requests. Delegates to specific functions based on the tool name. """ if not arguments: raise ValueError("Missing arguments") if name == "list_flows": return await list_flows(arguments) elif name == "get_flow_details": return await get_flow_details(arguments) elif name == "extract_json_fields": return await extract_json_fields(arguments) elif name == "analyze_protection": return await analyze_protection(arguments) else: raise ValueError(f"Unknown tool: {name}") async def main(): # Run the server using stdin/stdout streams async with mcp.server.stdio.stdio_server() as (read_stream, write_stream): await server.run( read_stream, write_stream, InitializationOptions( server_name="mitmproxy-mcp", server_version="0.1.0", capabilities=server.get_capabilities( notification_options=NotificationOptions(), experimental_capabilities={}, ), ), )