Skip to main content
Glama
lucasoeth

mitmproxy-mcp MCP Server

by lucasoeth

analyze_protection

Analyze HTTP traffic flows to detect bot protection mechanisms and extract challenge details for security testing and analysis.

Instructions

Analyze flow for bot protection mechanisms and extract challenge details

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
session_idYesThe ID of the session
flow_indexYesThe index of the flow to analyze
extract_scriptsNoWhether to extract and analyze JavaScript from the response (default: true)

Implementation Reference

  • The main handler function for the 'analyze_protection' tool. It retrieves the specified flow from the session, performs analysis using helper functions for protection systems, cookies, challenges, scripts, and generates suggestions, returning a JSON summary.
    async def analyze_protection(arguments: dict) -> list[types.TextContent]:
        """
        Analyze a flow for bot protection mechanisms and extract challenge details.
        """
        session_id = arguments.get("session_id")
        flow_index = arguments.get("flow_index")
        extract_scripts = arguments.get("extract_scripts", True)
        
        if not session_id:
            return [types.TextContent(type="text", text="Error: Missing session_id")]
        if flow_index is None:
            return [types.TextContent(type="text", text="Error: Missing flow_index")]
        
        try:
            flows = await get_flows_from_dump(session_id)
            
            try:
                flow = flows[flow_index]
                
                if flow.type != "http":
                    return [types.TextContent(type="text", text=f"Error: Flow {flow_index} is not an HTTP flow")]
                
                # Analyze the flow for protection mechanisms
                analysis = {
                    "flow_index": flow_index,
                    "method": flow.request.method,
                    "url": flow.request.url,
                    "protection_systems": identify_protection_system(flow),
                    "request_cookies": analyze_cookies(dict(flow.request.headers)),
                    "has_response": flow.response is not None,
                }
                
                if flow.response:
                    # Add response analysis
                    content_type = flow.response.headers.get("Content-Type", "")
                    is_html = "text/html" in content_type
                    
                    analysis.update({
                        "status_code": flow.response.status_code,
                        "response_cookies": analyze_cookies(dict(flow.response.headers)),
                        "challenge_analysis": analyze_response_for_challenge(flow),
                        "content_type": content_type,
                        "is_html": is_html,
                    })
                    
                    # If HTML and script extraction is requested, extract and analyze JavaScript
                    if is_html and extract_scripts:
                        try:
                            html_content = flow.response.content.decode('utf-8', errors='ignore')
                            analysis["scripts"] = extract_javascript(html_content)
                        except Exception as e:
                            analysis["script_extraction_error"] = str(e)
                
                # Add remediation suggestions based on findings
                analysis["suggestions"] = generate_suggestions(analysis)
                
                return [types.TextContent(type="text", text=json.dumps(analysis, indent=2))]
                
            except IndexError:
                return [types.TextContent(type="text", text=f"Error: Flow index {flow_index} out of range")]
                
        except FileNotFoundError:
            return [types.TextContent(type="text", text="Error: Session not found")]
        except Exception as e:
            return [types.TextContent(type="text", text=f"Error analyzing protection: {str(e)}")]
  • The JSON schema definition and registration of the 'analyze_protection' tool in the list_tools handler, specifying input parameters.
    types.Tool(
        name="analyze_protection",
        description="Analyze flow for bot protection mechanisms and extract challenge details",
        inputSchema={
            "type": "object",
            "properties": {
                "session_id": {
                    "type": "string",
                    "description": "The ID of the session"
                },
                "flow_index": {
                    "type": "integer",
                    "description": "The index of the flow to analyze"
                },
                "extract_scripts": {
                    "type": "boolean",
                    "description": "Whether to extract and analyze JavaScript from the response (default: true)",
                    "default": True
                }
            },
            "required": ["session_id", "flow_index"]
        }
    )
  • The dispatch logic in the call_tool handler that routes requests for 'analyze_protection' to the implementation function.
    elif name == "analyze_protection":
        return await analyze_protection(arguments)
  • Helper function to identify bot protection systems by matching signatures in headers and content against known patterns.
    def identify_protection_system(flow) -> List[Dict[str, Any]]:
        """
        Identify potential bot protection systems based on signatures.
        """
        protections = []
        
        # Combine all searchable content
        searchable_content = ""
        # Add request headers
        for k, v in flow.request.headers.items():
            searchable_content += f"{k}: {v}\n"
        
        # Check response if available
        if flow.response:
            # Add response headers
            for k, v in flow.response.headers.items():
                searchable_content += f"{k}: {v}\n"
            
            # Add response content if it's text
            content_type = flow.response.headers.get("Content-Type", "")
            if "text" in content_type or "javascript" in content_type or "json" in content_type:
                try:
                    searchable_content += flow.response.content.decode('utf-8', errors='ignore')
                except Exception:
                    pass
        
        # Check for protection signatures
        for vendor, signatures in BOT_PROTECTION_SIGNATURES.items():
            matches = []
            for sig in signatures:
                if re.search(sig, searchable_content, re.IGNORECASE):
                    matches.append(sig)
            
            if matches:
                protections.append({
                    "vendor": vendor,
                    "confidence": len(matches) / len(signatures) * 100,
                    "matching_signatures": matches
                })
        
        return sorted(protections, key=lambda x: x["confidence"], reverse=True)
  • Helper function to analyze the response for challenge presence based on status codes, headers, and content patterns.
    def analyze_response_for_challenge(flow) -> Dict[str, Any]:
        """
        Analyze a response to determine if it contains a challenge.
        """
        if not flow.response:
            return {"is_challenge": False}
        
        result = {
            "is_challenge": False,
            "challenge_indicators": [],
            "status_code": flow.response.status_code,
            "challenge_type": "unknown"
        }
        
        # Check status code
        if flow.response.status_code in [403, 429, 503]:
            result["challenge_indicators"].append(f"Suspicious status code: {flow.response.status_code}")
        
        # Check for challenge headers
        challenge_headers = {
            "cf-mitigated": "Cloudflare mitigation",
            "cf-chl-bypass": "Cloudflare challenge bypass",
            "x-datadome": "DataDome protection",
            "x-px": "PerimeterX",
            "x-amz-captcha": "AWS WAF Captcha"
        }
        
        for header, description in challenge_headers.items():
            if any(h.lower() == header.lower() for h in flow.response.headers.keys()):
                result["challenge_indicators"].append(f"Challenge header: {description}")
        
        # Check for challenge content patterns
        content = flow.response.content.decode('utf-8', errors='ignore')
        challenge_patterns = [
            (r'captcha', "CAPTCHA"),
            (r'challenge', "Challenge term"),
            (r'blocked', "Blocking message"),
            (r'verify.*human', "Human verification"),
            (r'suspicious.*activity', "Suspicious activity message"),
            (r'security.*check', "Security check message"),
            (r'ddos', "DDoS protection message"),
            (r'automated.*request', "Automated request detection")
        ]
        
        for pattern, description in challenge_patterns:
            if re.search(pattern, content, re.IGNORECASE):
                result["challenge_indicators"].append(f"Content indicator: {description}")
        
        # Determine if this is a challenge response
        result["is_challenge"] = len(result["challenge_indicators"]) > 0
        
        # Determine challenge type
        if "CAPTCHA" in " ".join(result["challenge_indicators"]):
            result["challenge_type"] = "captcha"
        elif "JavaScript" in content and result["is_challenge"]:
            result["challenge_type"] = "javascript"
        elif result["is_challenge"]:
            result["challenge_type"] = "other"
        
        return result

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/lucasoeth/mitmproxy-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server