mitmproxy-mcp MCP Server

import os import json from typing import Any, Dict, List, Union from mitmproxy import io # Directory where mitmproxy dump files are stored DUMP_DIR = "/Users/lucas/Coding/mitmproxy-mcp/dumps" # Cache for storing flows per session FLOW_CACHE = {} async def get_flows_from_dump(session_id: str) -> list: """ Retrieves flows from the dump file, using the cache if available. """ dump_file = os.path.join(DUMP_DIR, f"{session_id}.dump") if not os.path.exists(dump_file): raise FileNotFoundError("Session not found") if session_id in FLOW_CACHE: return FLOW_CACHE[session_id] else: with open(dump_file, "rb") as f: reader = io.FlowReader(f) flows = list(reader.stream()) FLOW_CACHE[session_id] = flows return flows def parse_json_content(content: bytes, headers: dict) -> Union[Dict, str, bytes]: """ Attempts to parse content as JSON if the content type indicates JSON. Returns the parsed JSON or the raw content if parsing fails. """ content_type = headers.get("Content-Type", "").lower() if headers else "" if "application/json" in content_type or "text/json" in content_type: try: return json.loads(content.decode(errors="ignore")) except json.JSONDecodeError: return content.decode(errors="ignore") return content.decode(errors="ignore")