Skip to main content
Glama
lucasoeth

mitmproxy-mcp MCP Server

by lucasoeth

analyze_protection

Analyze HTTP traffic flows to detect bot protection mechanisms and extract challenge details for security testing and analysis.

Instructions

Analyze flow for bot protection mechanisms and extract challenge details

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
session_idYesThe ID of the session
flow_indexYesThe index of the flow to analyze
extract_scriptsNoWhether to extract and analyze JavaScript from the response (default: true)

Implementation Reference

  • The main handler function for the 'analyze_protection' tool. It retrieves the specified flow from the session, performs analysis using helper functions for protection systems, cookies, challenges, scripts, and generates suggestions, returning a JSON summary.
    async def analyze_protection(arguments: dict) -> list[types.TextContent]: """ Analyze a flow for bot protection mechanisms and extract challenge details. """ session_id = arguments.get("session_id") flow_index = arguments.get("flow_index") extract_scripts = arguments.get("extract_scripts", True) if not session_id: return [types.TextContent(type="text", text="Error: Missing session_id")] if flow_index is None: return [types.TextContent(type="text", text="Error: Missing flow_index")] try: flows = await get_flows_from_dump(session_id) try: flow = flows[flow_index] if flow.type != "http": return [types.TextContent(type="text", text=f"Error: Flow {flow_index} is not an HTTP flow")] # Analyze the flow for protection mechanisms analysis = { "flow_index": flow_index, "method": flow.request.method, "url": flow.request.url, "protection_systems": identify_protection_system(flow), "request_cookies": analyze_cookies(dict(flow.request.headers)), "has_response": flow.response is not None, } if flow.response: # Add response analysis content_type = flow.response.headers.get("Content-Type", "") is_html = "text/html" in content_type analysis.update({ "status_code": flow.response.status_code, "response_cookies": analyze_cookies(dict(flow.response.headers)), "challenge_analysis": analyze_response_for_challenge(flow), "content_type": content_type, "is_html": is_html, }) # If HTML and script extraction is requested, extract and analyze JavaScript if is_html and extract_scripts: try: html_content = flow.response.content.decode('utf-8', errors='ignore') analysis["scripts"] = extract_javascript(html_content) except Exception as e: analysis["script_extraction_error"] = str(e) # Add remediation suggestions based on findings analysis["suggestions"] = generate_suggestions(analysis) return [types.TextContent(type="text", text=json.dumps(analysis, indent=2))] except IndexError: return [types.TextContent(type="text", text=f"Error: Flow index {flow_index} out of range")] except FileNotFoundError: return [types.TextContent(type="text", text="Error: Session not found")] except Exception as e: return [types.TextContent(type="text", text=f"Error analyzing protection: {str(e)}")]
  • The JSON schema definition and registration of the 'analyze_protection' tool in the list_tools handler, specifying input parameters.
    types.Tool( name="analyze_protection", description="Analyze flow for bot protection mechanisms and extract challenge details", inputSchema={ "type": "object", "properties": { "session_id": { "type": "string", "description": "The ID of the session" }, "flow_index": { "type": "integer", "description": "The index of the flow to analyze" }, "extract_scripts": { "type": "boolean", "description": "Whether to extract and analyze JavaScript from the response (default: true)", "default": True } }, "required": ["session_id", "flow_index"] } )
  • The dispatch logic in the call_tool handler that routes requests for 'analyze_protection' to the implementation function.
    elif name == "analyze_protection": return await analyze_protection(arguments)
  • Helper function to identify bot protection systems by matching signatures in headers and content against known patterns.
    def identify_protection_system(flow) -> List[Dict[str, Any]]: """ Identify potential bot protection systems based on signatures. """ protections = [] # Combine all searchable content searchable_content = "" # Add request headers for k, v in flow.request.headers.items(): searchable_content += f"{k}: {v}\n" # Check response if available if flow.response: # Add response headers for k, v in flow.response.headers.items(): searchable_content += f"{k}: {v}\n" # Add response content if it's text content_type = flow.response.headers.get("Content-Type", "") if "text" in content_type or "javascript" in content_type or "json" in content_type: try: searchable_content += flow.response.content.decode('utf-8', errors='ignore') except Exception: pass # Check for protection signatures for vendor, signatures in BOT_PROTECTION_SIGNATURES.items(): matches = [] for sig in signatures: if re.search(sig, searchable_content, re.IGNORECASE): matches.append(sig) if matches: protections.append({ "vendor": vendor, "confidence": len(matches) / len(signatures) * 100, "matching_signatures": matches }) return sorted(protections, key=lambda x: x["confidence"], reverse=True)
  • Helper function to analyze the response for challenge presence based on status codes, headers, and content patterns.
    def analyze_response_for_challenge(flow) -> Dict[str, Any]: """ Analyze a response to determine if it contains a challenge. """ if not flow.response: return {"is_challenge": False} result = { "is_challenge": False, "challenge_indicators": [], "status_code": flow.response.status_code, "challenge_type": "unknown" } # Check status code if flow.response.status_code in [403, 429, 503]: result["challenge_indicators"].append(f"Suspicious status code: {flow.response.status_code}") # Check for challenge headers challenge_headers = { "cf-mitigated": "Cloudflare mitigation", "cf-chl-bypass": "Cloudflare challenge bypass", "x-datadome": "DataDome protection", "x-px": "PerimeterX", "x-amz-captcha": "AWS WAF Captcha" } for header, description in challenge_headers.items(): if any(h.lower() == header.lower() for h in flow.response.headers.keys()): result["challenge_indicators"].append(f"Challenge header: {description}") # Check for challenge content patterns content = flow.response.content.decode('utf-8', errors='ignore') challenge_patterns = [ (r'captcha', "CAPTCHA"), (r'challenge', "Challenge term"), (r'blocked', "Blocking message"), (r'verify.*human', "Human verification"), (r'suspicious.*activity', "Suspicious activity message"), (r'security.*check', "Security check message"), (r'ddos', "DDoS protection message"), (r'automated.*request', "Automated request detection") ] for pattern, description in challenge_patterns: if re.search(pattern, content, re.IGNORECASE): result["challenge_indicators"].append(f"Content indicator: {description}") # Determine if this is a challenge response result["is_challenge"] = len(result["challenge_indicators"]) > 0 # Determine challenge type if "CAPTCHA" in " ".join(result["challenge_indicators"]): result["challenge_type"] = "captcha" elif "JavaScript" in content and result["is_challenge"]: result["challenge_type"] = "javascript" elif result["is_challenge"]: result["challenge_type"] = "other" return result

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/lucasoeth/mitmproxy-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server