Skip to main content
Glama
http_server.py104 kB
import json import threading import urllib.parse from http.server import BaseHTTPRequestHandler, HTTPServer from typing import Any import binaryninja as bn from binaryninja.settings import Settings from ..api.endpoints import BinaryNinjaEndpoints from ..core.binary_operations import BinaryOperations from ..core.config import Config from ..utils.number_utils import convert_number as util_convert_number from ..utils.string_utils import parse_int_or_default class MCPRequestHandler(BaseHTTPRequestHandler): binary_ops = None # Will be set by the server def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) @property def endpoints(self): # Create endpoints on demand to ensure binary_ops is set if not hasattr(self, "_endpoints"): if not self.binary_ops: raise RuntimeError("binary_ops not initialized") self._endpoints = BinaryNinjaEndpoints(self.binary_ops) return self._endpoints def log_message(self, format, *args): bn.log_info(format % args) def _set_headers(self, content_type="application/json", status_code=200): try: self.send_response(status_code) self.send_header("Content-Type", content_type) self.send_header("Access-Control-Allow-Origin", "*") # Encourage clients to close promptly; reduces BrokenPipe on abrupt disconnects self.send_header("Connection", "close") self.end_headers() except (BrokenPipeError, OSError): try: import binaryninja as _bn _bn.log_warn("Client disconnected while sending headers") except Exception: pass def _send_json_response(self, data: dict[str, Any], status_code: int = 200): try: self._set_headers(status_code=status_code) # If headers failed due to disconnect, avoid writing body try: body = json.dumps(data).encode("utf-8") except Exception: body = b"{}" try: self.wfile.write(body) except (BrokenPipeError, OSError): try: import binaryninja as _bn _bn.log_warn("Client disconnected while sending body") except Exception: pass except Exception: # Last-resort swallow to avoid cascading errors on disconnects try: import binaryninja as _bn _bn.log_debug("Suppressed exception during response write") except Exception: pass def _parse_query_params(self) -> dict[str, str]: parsed_path = urllib.parse.urlparse(self.path) return dict(urllib.parse.parse_qsl(parsed_path.query)) def _parse_post_params(self) -> dict[str, Any]: """Parse POST request parameters from various formats. Supports: - JSON data (application/json) - Form data (application/x-www-form-urlencoded) - Raw text (text/plain) Returns: Dictionary containing the parsed parameters """ content_length = int(self.headers.get("Content-Length", 0)) if content_length == 0: return {} content_type = self.headers.get("Content-Type", "") post_data = self.rfile.read(content_length).decode("utf-8") bn.log_info(f"Received POST data: {post_data}") bn.log_info(f"Content-Type: {content_type}") # Handle JSON data if "application/json" in content_type.lower(): try: return json.loads(post_data) except json.JSONDecodeError as e: bn.log_error(f"Failed to parse JSON: {e}") return {"error": "Invalid JSON format"} # Handle form data if "application/x-www-form-urlencoded" in content_type.lower(): try: return dict(urllib.parse.parse_qsl(post_data)) except Exception as e: bn.log_error(f"Failed to parse form data: {e}") return {"error": "Invalid form data format"} # Handle raw text if "text/plain" in content_type.lower() or not content_type: return {"name": post_data.strip()} # ---------- Helpers ---------- def _resolve_name_to_address(self, ident: str): """Resolve a symbol name or hex address string to (address:int, label:str). Tries, in order: - Parse hex address (with or without 0x) - get_symbol_by_raw_name - get_symbol_by_name - scan data_vars for matching symbol name/raw_name """ bv = getattr(self.binary_ops, "current_view", None) if not bv: return None, None s = (ident or "").strip() # Hex address try: if s.lower().startswith("0x"): return int(s, 16), s # bare hex if all(c in "0123456789abcdefABCDEF" for c in s): return int(s, 16), s except Exception: pass # Raw name try: get_raw = getattr(bv, "get_symbol_by_raw_name", None) sym = get_raw(s) if callable(get_raw) else None if sym and hasattr(sym, "address"): return int(sym.address), getattr(sym, "name", s) except Exception: pass # Pretty name try: get_by_name = getattr(bv, "get_symbol_by_name", None) sym = get_by_name(s) if callable(get_by_name) else None if sym and hasattr(sym, "address"): return int(sym.address), getattr(sym, "name", s) except Exception: pass # Heuristic: BN auto-generated data labels like data_100003f66, byte_..., word_..., dword_..., qword_..., off_..., unk_... try: import re as _re m = _re.match(r"^(?i)(?:data|byte|word|dword|qword|off|unk)_(?:0x)?([0-9a-fA-F]+)$", s) if m: a = int(m.group(1), 16) return a, s except Exception: pass # Scan data vars try: for var in list(bv.data_vars): try: sy = bv.get_symbol_at(var) if not sy: continue if getattr(sy, "name", None) == s or getattr(sy, "raw_name", None) == s: return int(var), getattr(sy, "name", s) except Exception: continue except Exception: pass return None, None def _c_escape(self, raw: bytes, limit: int | None = None) -> str: """Escape bytes as a C string literal.""" try: b = raw if limit is None else raw[:limit] out = [] for ch in b: if ch == 0x22: # '"' out.append('\\"') elif ch == 0x5C: # '\\' out.append("\\\\") elif 32 <= ch <= 126: out.append(chr(ch)) elif ch == 0x0A: out.append("\\n") elif ch == 0x0D: out.append("\\r") elif ch == 0x09: out.append("\\t") else: out.append(f"\\x{ch:02x}") return '"' + "".join(out) + '"' except Exception: return '""' def _check_binary_loaded(self): """Check if a binary is loaded and return appropriate error response if not""" if not self.binary_ops or not self.binary_ops.current_view: self._send_json_response({"error": "No binary loaded"}, 400) return False return True def do_GET(self): try: # For all endpoints except /status, /convertNumber, /platforms, /binaries, /views, /selectBinary, check loaded if ( not ( self.path.startswith("/status") or self.path.startswith("/convertNumber") or self.path.startswith("/platforms") or self.path.startswith("/binaries") or self.path.startswith("/views") or self.path.startswith("/selectBinary") ) and not self._check_binary_loaded() ): return params = self._parse_query_params() path = urllib.parse.urlparse(self.path).path offset = parse_int_or_default(params.get("offset"), 0) # Support both `limit` and `count` (alias) for pagination if params.get("count") is not None: limit = parse_int_or_default(params.get("count"), 100) else: limit = parse_int_or_default(params.get("limit"), 100) if path == "/status": status = { "loaded": self.binary_ops and self.binary_ops.current_view is not None, "filename": self.binary_ops.current_view.file.filename if self.binary_ops and self.binary_ops.current_view else None, } self._send_json_response(status) elif path == "/functions" or path == "/methods": functions = self.binary_ops.get_function_names(offset, limit) bn.log_info(f"Found {len(functions)} functions") self._send_json_response({"functions": functions}) elif path == "/classes": classes = self.binary_ops.get_class_names(offset, limit) self._send_json_response({"classes": classes}) elif path == "/segments": segments = self.binary_ops.get_segments(offset, limit) self._send_json_response({"segments": segments}) elif path == "/imports": imports = self.endpoints.get_imports(offset, limit) self._send_json_response({"imports": imports}) elif path == "/binaries" or path == "/views": # List managed/open binaries self._send_json_response(self.endpoints.list_binaries()) elif path == "/selectBinary": ident = ( params.get("view") or params.get("binary") or params.get("id") or params.get("file") ) if not ident: self._send_json_response( { "error": "Missing parameter", "help": "Use ?view=<id|filename>", }, 400, ) else: self._send_json_response(self.endpoints.select_binary(ident)) elif path == "/exports": exports = self.endpoints.get_exports(offset, limit) self._send_json_response({"exports": exports}) elif path == "/sections": try: sections = self.binary_ops.get_sections(offset, limit) self._send_json_response({"sections": sections}) except Exception as e: bn.log_error(f"Error getting sections: {e}") self._send_json_response({"error": str(e)}, 500) elif path == "/entryPoints": try: eps = self.endpoints.get_entry_points() self._send_json_response({"entry_points": eps}) except Exception as e: bn.log_error(f"Error handling entryPoints: {e}") self._send_json_response({"error": str(e)}, 500) elif path == "/namespaces": namespaces = self.endpoints.get_namespaces(offset, limit) self._send_json_response({"namespaces": namespaces}) elif path == "/data": try: # length: desired byte count to read for preview; negative means "read exact defined size" length_param = params.get("length") preview_param = params.get("previewLen") if length_param is not None: read_len = parse_int_or_default(length_param, 32) elif preview_param is not None: read_len = parse_int_or_default(preview_param, 32) else: # Default: read exact defined size when available read_len = -1 data_items = self.binary_ops.get_defined_data(offset, limit, read_len) self._send_json_response({"data": data_items}) except Exception as e: bn.log_error(f"Error getting data items: {e}") self._send_json_response({"error": str(e)}, 500) elif path == "/localTypes": try: include_libs = params.get("includeLibraries") in ( "1", "true", "True", ) types = self.binary_ops.list_local_types( offset, limit, include_libraries=include_libs ) bn.log_info( f"/localTypes returned {len(types)} entries (offset={offset}, limit={limit})" ) self._send_json_response({"types": types}) except Exception as e: bn.log_error(f"Error listing local types: {e}") self._send_json_response({"error": str(e)}, 500) elif path == "/searchTypes": try: term = params.get("query") or params.get("q") if not term: self._send_json_response( { "error": "Missing query parameter", "help": "Required: query or q", }, 400, ) return # support count=-1 to return all eff_limit = ( -1 if (params.get("count") == "-1" or params.get("limit") == "-1") else limit ) include_libs = params.get("includeLibraries") in ( "1", "true", "True", ) # First compute total all_matches = self.binary_ops.search_local_types( term, 0, -1, include_libraries=include_libs ) page = ( all_matches[offset:] if eff_limit < 0 else all_matches[offset : offset + eff_limit] ) self._send_json_response( { "types": page, "query": term, "total": len(all_matches), "offset": offset, "limit": eff_limit, "includeLibraries": include_libs, } ) except Exception as e: bn.log_error(f"Error searching local types: {e}") self._send_json_response({"error": str(e)}, 500) elif path == "/strings": try: bn.log_info( f"/strings request: offset={offset}, limit={limit}, raw_params={params}" ) strings = self.binary_ops.get_strings(offset, limit) self._send_json_response({"strings": strings}) except Exception as e: bn.log_error(f"Error getting strings: {e}") self._send_json_response({"error": str(e)}, 500) elif path == "/allStrings": try: # Return all strings without pagination bn.log_info("/allStrings request received") strings = self.binary_ops.get_strings(0, 2147483647) self._send_json_response({"strings": strings}) except Exception as e: bn.log_error(f"Error getting all strings: {e}") self._send_json_response({"error": str(e)}, 500) elif path == "/hexdump": try: address_str = params.get("address") if not address_str: self._set_headers(content_type="text/plain", status_code=400) self.wfile.write(b"Missing address parameter\n") return # Parse address try: addr = ( int(address_str, 16) if address_str.startswith("0x") else int( address_str, 16 if all(c in "0123456789abcdefABCDEF" for c in address_str) else 10, ) ) except Exception: self._set_headers(content_type="text/plain", status_code=400) self.wfile.write(b"Invalid address format; use hex like 0x401000\n") return # Determine length length_param = params.get("length") read_len = None if length_param is not None: try: read_len = int(length_param) except Exception: read_len = None # Default to exact defined size when available if read_len is None: read_len = -1 # If negative, try to use exact defined size at this address if read_len < 0: try: inferred = self.binary_ops.infer_data_size(addr) if inferred is not None and inferred > 0: read_len = int(inferred) except Exception: pass # Fallback default length if read_len is None or read_len < 0: read_len = 64 # Read bytes try: data = self.binary_ops.current_view.read(addr, read_len) if data is None: data = b"" except Exception: data = b"" # Resolve symbol name for header label label = None try: sym = self.binary_ops.current_view.get_symbol_at(addr) if sym and hasattr(sym, "name"): label = sym.name except Exception: label = None # Build hexdump def _printable(b: int) -> str: try: return chr(b) if 32 <= b <= 126 else "." except Exception: return "." lines = [] addr_hex = format(addr, "x") if label: lines.append(f"{addr_hex} {label}:") else: lines.append(f"{addr_hex}:") total = len(data) offset = 0 # First line may be unaligned first_pad = addr % 16 if first_pad != 0 and total > 0: take = min(16 - first_pad, total) chunk = data[0:take] hex_area = (" " * first_pad) + "".join(f"{b:02x} " for b in chunk) hex_area += " " * (16 - first_pad - take) ascii_area = (" " * first_pad) + "".join(_printable(b) for b in chunk) ascii_area += " " * (16 - first_pad - take) lines.append(f"{addr_hex} {hex_area} {ascii_area}") offset += take # Full lines while offset < total: line_addr = addr + offset take = min(16, total - offset) chunk = data[offset : offset + take] hex_area = "".join(f"{b:02x} " for b in chunk) + (" " * (16 - take)) ascii_area = "".join(_printable(b) for b in chunk) + (" " * (16 - take)) lines.append(f"{format(line_addr, 'x')} {hex_area} {ascii_area}") offset += take text = "\n".join(lines) + "\n" self._set_headers(content_type="text/plain", status_code=200) self.wfile.write(text.encode("utf-8", errors="replace")) except Exception as e: bn.log_error(f"Error handling hexdump: {e}") self._set_headers(content_type="text/plain", status_code=500) self.wfile.write(f"Error: {e}\n".encode()) elif path == "/hexdumpByName": try: name = params.get("name") or params.get("symbol") or params.get("raw_name") if not name: self._set_headers(content_type="text/plain", status_code=400) self.wfile.write(b"Missing name parameter\n") return addr, label = self._resolve_name_to_address(name) if addr is None: self._set_headers(content_type="text/plain", status_code=404) self.wfile.write(b"Symbol not found\n") return # Determine length length_param = params.get("length") try: read_len = int(length_param) if length_param is not None else -1 except Exception: read_len = -1 if read_len < 0: try: inferred = self.binary_ops.infer_data_size(addr) if inferred is not None and inferred > 0: read_len = int(inferred) except Exception: pass if read_len is None or read_len < 0: read_len = 64 # Read and format try: data = self.binary_ops.current_view.read(addr, read_len) or b"" except Exception: data = b"" def _printable(b: int) -> str: try: return chr(b) if 32 <= b <= 126 else "." except Exception: return "." lines = [] addr_hex = format(addr, "x") lines.append(f"{addr_hex} {label}:") total = len(data) offset = 0 first_pad = addr % 16 if first_pad != 0 and total > 0: take = min(16 - first_pad, total) chunk = data[0:take] hex_area = (" " * first_pad) + "".join(f"{b:02x} " for b in chunk) hex_area += " " * (16 - first_pad - take) ascii_area = (" " * first_pad) + "".join(_printable(b) for b in chunk) ascii_area += " " * (16 - first_pad - take) lines.append(f"{addr_hex} {hex_area} {ascii_area}") offset += take while offset < total: line_addr = addr + offset take = min(16, total - offset) chunk = data[offset : offset + take] hex_area = "".join(f"{b:02x} " for b in chunk) + (" " * (16 - take)) ascii_area = "".join(_printable(b) for b in chunk) + (" " * (16 - take)) lines.append(f"{format(line_addr, 'x')} {hex_area} {ascii_area}") offset += take text = "\n".join(lines) + "\n" self._set_headers(content_type="text/plain", status_code=200) self.wfile.write(text.encode("utf-8", errors="replace")) except Exception as e: bn.log_error(f"Error handling hexdumpByName: {e}") self._set_headers(content_type="text/plain", status_code=500) self.wfile.write(f"Error: {e}\n".encode()) elif path == "/getDataDecl": try: ident = ( params.get("name") or params.get("symbol") or params.get("raw_name") or params.get("address") ) if not ident: self._send_json_response( { "error": "Missing name/address parameter", "help": "Provide name, symbol, raw_name, or address", }, 400, ) return addr, label = self._resolve_name_to_address(ident) if addr is None: self._send_json_response({"error": "Symbol not found", "ident": ident}, 404) return # Determine exact size and type size = None type_text = None try: bv = self.binary_ops.current_view dv = bv.get_data_var_at(addr) if hasattr(bv, "get_data_var_at") else None typ_obj = ( dv.type if (dv is not None and hasattr(dv, "type")) else (bv.get_type_at(addr) if hasattr(bv, "get_type_at") else None) ) if typ_obj is not None: type_text = str(typ_obj) if hasattr(typ_obj, "width") and typ_obj.width: size = int(typ_obj.width) except Exception: pass if size is None: try: inferred = self.binary_ops.infer_data_size(addr) if inferred and inferred > 0: size = int(inferred) except Exception: pass if size is None: size = 64 # Read bytes try: raw = self.binary_ops.current_view.read(addr, size) or b"" except Exception: raw = b"" # Build a declaration string (best-effort) decl = None try: # Prefer explicit char[] initialization when printable is_char_array = (type_text or "").lower().startswith( "char" ) or "char [" in (type_text or "").lower() if is_char_array and raw: esc = self._c_escape(raw.rstrip(b"\x00")) decl = f"{type_text} {label} = {esc};" else: if type_text: decl = f"{type_text} {label};" else: decl = f"/* size={size} */ {label};" except Exception: decl = f"/* size={size} */ {label};" # Also include a hexdump for convenience # Reuse the hexdump generation above def _printable(b: int) -> str: try: return chr(b) if 32 <= b <= 126 else "." except Exception: return "." lines = [] addr_hex = format(addr, "x") lines.append(f"{addr_hex} {label}:") total = len(raw) offset = 0 first_pad = addr % 16 if first_pad != 0 and total > 0: take = min(16 - first_pad, total) chunk = raw[0:take] hex_area = (" " * first_pad) + "".join(f"{b:02x} " for b in chunk) hex_area += " " * (16 - first_pad - take) ascii_area = (" " * first_pad) + "".join(_printable(b) for b in chunk) ascii_area += " " * (16 - first_pad - take) lines.append(f"{addr_hex} {hex_area} {ascii_area}") offset += take while offset < total: line_addr = addr + offset take = min(16, total - offset) chunk = raw[offset : offset + take] hex_area = "".join(f"{b:02x} " for b in chunk) + (" " * (16 - take)) ascii_area = "".join(_printable(b) for b in chunk) + (" " * (16 - take)) lines.append(f"{format(line_addr, 'x')} {hex_area} {ascii_area}") offset += take hexdump_text = "\n".join(lines) + "\n" self._send_json_response( { "address": hex(addr), "name": label, "size": size, "type": type_text, "decl": decl, "hexdump": hexdump_text, } ) except Exception as e: bn.log_error(f"Error handling getDataDecl: {e}") self._send_json_response({"error": str(e)}, 500) elif path == "/strings/filter": try: pattern = params.get("filter", "") bn.log_info( f"/strings/filter request: offset={offset}, limit={limit}, pattern={pattern}" ) # Get all strings first, then filter and paginate all_strings = self.binary_ops.get_strings(0, 2147483647) if pattern: pl = pattern.lower() filtered = [ s for s in all_strings if isinstance(s.get("value"), str) and pl in s.get("value", "").lower() ] else: filtered = all_strings page = filtered[offset : offset + limit] self._send_json_response({"strings": page, "total": len(filtered)}) except Exception as e: bn.log_error(f"Error filtering strings: {e}") self._send_json_response({"error": str(e)}, 500) elif path == "/searchFunctions": search_term = params.get("query", "") matches = self.endpoints.search_functions(search_term, offset, limit) self._send_json_response({"matches": matches}) elif path == "/decompile": function_name = params.get("name") or params.get("functionName") if not function_name: self._send_json_response( { "error": "Missing function name parameter. Use ?name=function_name or ?functionName=function_name" }, 400, ) return self._handle_decompile(function_name) elif path == "/assembly": function_name = params.get("name") or params.get("functionName") if not function_name: self._send_json_response( { "error": "Missing function name parameter. Use ?name=function_name or ?functionName=function_name" }, 400, ) return try: func_info = self.binary_ops.get_function_info(function_name) if not func_info: bn.log_error(f"Function not found: {function_name}") self._send_json_response( { "error": "Function not found", "requested_name": function_name, "available_functions": self.binary_ops.get_function_names(0, 10), }, 404, ) return bn.log_info(f"Found function for assembly: {func_info}") assembly = self.binary_ops.get_assembly_function(function_name) if assembly is None: self._send_json_response( { "error": "Assembly retrieval failed", "function": func_info, "reason": "Function assembly could not be retrieved. Check the Binary Ninja log for detailed error information.", }, 500, ) else: self._send_json_response({"assembly": assembly, "function": func_info}) except Exception as e: bn.log_error(f"Error handling assembly request: {e!s}") import traceback bn.log_error(traceback.format_exc()) self._send_json_response( { "error": "Assembly retrieval failed", "requested_name": function_name, "exception": str(e), }, 500, ) elif path == "/il": # Return IL by view (hlil/mlil/llil) and optional SSA form ident = params.get("name") or params.get("functionName") or params.get("address") if not ident: self._send_json_response( { "error": "Missing function identifier", "help": "Use ?name=<func> or ?address=<hex> with optional &view=hlil|mlil|llil&ssa=0|1", "received": params, }, 400, ) return view = (params.get("view") or params.get("il") or "hlil").strip() ssa_param = (params.get("ssa") or params.get("isSSA") or "0").strip().lower() ssa = ssa_param in ("1", "true", "yes", "on") try: func_info = self.binary_ops.get_function_info(ident) if not func_info: self._send_json_response( { "error": "Function not found", "requested": ident, "available_functions": self.binary_ops.get_function_names(0, 10), }, 404, ) return il_text = self.binary_ops.get_function_il(ident, view=view, ssa=ssa) if il_text is None: self._send_json_response( { "error": "Failed to get IL", "function": func_info, "view": view, "ssa": ssa, "reason": "Unsupported IL view or unavailable instructions", }, 500, ) return self._send_json_response( {"il": il_text, "function": func_info, "view": view, "ssa": ssa} ) except Exception as e: bn.log_error(f"Error handling IL request: {e!s}") self._send_json_response( { "error": "IL retrieval failed", "requested": ident, "view": view, "ssa": ssa, "exception": str(e), }, 500, ) elif path == "/functionAt": address_str = params.get("address") if not address_str: self._send_json_response( { "error": "Missing address parameter", "help": "Required parameter: address (in hex format, e.g., 0x41d100) the address of an insruction", "received": params, }, 400, ) return try: # Convert hex string to integer if isinstance(address_str, str) and address_str.startswith("0x"): offset = int(address_str, 16) else: offset = int(address_str) # Add function to binary_operations.py function_names = self.binary_ops.get_functions_containing_address(offset) self._send_json_response({"address": hex(offset), "functions": function_names}) except ValueError: self._send_json_response( { "error": "Invalid address format", "help": "Address must be a valid hexadecimal (0x...) or decimal number", "received": address_str, }, 400, ) except Exception as e: bn.log_error(f"Error handling function_at request: {e}") self._send_json_response( { "error": str(e), "address": address_str, }, 500, ) elif path == "/getUserDefinedType": type_name = params.get("name") if not type_name: self._send_json_response( { "error": "Missing name parameter", "help": "Required parameter: name (name of the user-defined type to retrieve)", "received": params, }, 400, ) return try: # Get the user-defined type definition type_info = self.binary_ops.get_user_defined_type(type_name) if type_info: self._send_json_response(type_info) else: # If type not found, list available types for reference available_types = {} try: if ( hasattr(self.binary_ops._current_view, "user_type_container") and self.binary_ops._current_view.user_type_container ): for ( type_id ) in self.binary_ops._current_view.user_type_container.types.keys(): current_type = ( self.binary_ops._current_view.user_type_container.types[ type_id ] ) available_types[current_type[0]] = ( str(current_type[1].type) if hasattr(current_type[1], "type") else "unknown" ) except Exception as e: bn.log_error(f"Error listing available types: {e}") self._send_json_response( { "error": "Type not found", "requested_type": type_name, "available_types": available_types, }, 404, ) except Exception as e: bn.log_error(f"Error handling getUserDefinedType request: {e}") self._send_json_response( { "error": str(e), "type_name": type_name, }, 500, ) elif path == "/comment": if self.command == "GET": address = params.get("address") if not address: self._send_json_response( { "error": "Missing address parameter", "help": "Required parameter: address", "received": params, }, 400, ) return try: address_int = int(address, 16) if isinstance(address, str) else int(address) comment = self.binary_ops.get_comment(address_int) if comment is not None: self._send_json_response( { "success": True, "address": hex(address_int), "comment": comment, } ) else: self._send_json_response( { "success": True, "address": hex(address_int), "comment": None, "message": "No comment found at this address", } ) except ValueError: self._send_json_response({"error": "Invalid address format"}, 400) elif self.command == "DELETE": address = params.get("address") if not address: self._send_json_response( { "error": "Missing address parameter", "help": "Required parameter: address", "received": params, }, 400, ) return try: address_int = int(address, 16) if isinstance(address, str) else int(address) success = self.binary_ops.delete_comment(address_int) if success: self._send_json_response( { "success": True, "message": f"Successfully deleted comment at {hex(address_int)}", } ) else: self._send_json_response( { "error": "Failed to delete comment", "message": "The comment could not be deleted at the specified address.", }, 500, ) except ValueError: self._send_json_response({"error": "Invalid address format"}, 400) else: # POST address = params.get("address") comment = params.get("comment") if not address or comment is None: self._send_json_response( { "error": "Missing parameters", "help": "Required parameters: address and comment", "received": params, }, 400, ) return try: address_int = int(address, 16) if isinstance(address, str) else int(address) success = self.binary_ops.set_comment(address_int, comment) if success: self._send_json_response( { "success": True, "message": f"Successfully set comment at {hex(address_int)}", "comment": comment, } ) else: self._send_json_response( { "error": "Failed to set comment", "message": "The comment could not be set at the specified address.", }, 500, ) except ValueError: self._send_json_response({"error": "Invalid address format"}, 400) elif path == "/comment/function": if self.command == "GET": function_name = params.get("name") or params.get("functionName") if not function_name: self._send_json_response( { "error": "Missing function name parameter", "help": "Required parameter: name (or functionName)", "received": params, }, 400, ) return comment = self.binary_ops.get_function_comment(function_name) if comment is not None: self._send_json_response( { "success": True, "function": function_name, "comment": comment, } ) else: self._send_json_response( { "success": True, "function": function_name, "comment": None, "message": "No comment found for this function", } ) elif self.command == "DELETE": function_name = params.get("name") or params.get("functionName") if not function_name: self._send_json_response( { "error": "Missing function name parameter", "help": "Required parameter: name (or functionName)", "received": params, }, 400, ) return success = self.binary_ops.delete_function_comment(function_name) if success: self._send_json_response( { "success": True, "message": f"Successfully deleted comment for function {function_name}", } ) else: self._send_json_response( { "error": "Failed to delete function comment", "message": "The comment could not be deleted for the specified function.", }, 500, ) else: # POST function_name = params.get("name") or params.get("functionName") comment = params.get("comment") if not function_name or comment is None: self._send_json_response( { "error": "Missing parameters", "help": "Required parameters: name (or functionName) and comment", "received": params, }, 400, ) return success = self.binary_ops.set_function_comment(function_name, comment) if success: self._send_json_response( { "success": True, "message": f"Successfully set comment for function {function_name}", "comment": comment, } ) else: self._send_json_response( { "error": "Failed to set function comment", "message": "The comment could not be set for the specified function.", }, 500, ) elif path == "/getComment": address = params.get("address") if not address: self._send_json_response( { "error": "Missing address parameter", "help": "Required parameter: address", "received": params, }, 400, ) return try: address_int = int(address, 16) if isinstance(address, str) else int(address) comment = self.binary_ops.get_comment(address_int) if comment is not None: self._send_json_response( { "success": True, "address": hex(address_int), "comment": comment, } ) else: self._send_json_response( { "success": True, "address": hex(address_int), "comment": None, "message": "No comment found at this address", } ) except ValueError: self._send_json_response({"error": "Invalid address format"}, 400) elif path == "/getFunctionComment": function_name = params.get("name") or params.get("functionName") if not function_name: self._send_json_response( { "error": "Missing function name parameter", "help": "Required parameter: name (or functionName)", "received": params, }, 400, ) return comment = self.binary_ops.get_function_comment(function_name) if comment is not None: self._send_json_response( { "success": True, "function": function_name, "comment": comment, } ) else: self._send_json_response( { "success": True, "function": function_name, "comment": None, "message": "No comment found for this function", } ) elif path == "/setFunctionPrototype": # Accept both GET and POST to support long prototypes via POST body address_str = ( params.get("address") or params.get("functionAddress") or params.get("addr") or params.get("name") ) proto = params.get("prototype") or params.get("signature") or params.get("type") if not address_str or proto is None: self._send_json_response( { "error": "Missing parameters", "help": "Required: address (or functionAddress/addr) and prototype (or signature/type)", "received": params, }, 400, ) return try: # Do minimal validation here; the endpoint will resolve name or address result = self.endpoints.set_function_prototype(address_str, proto) self._send_json_response(result) except ValueError as ve: self._send_json_response({"error": str(ve)}, 400) except Exception as e: bn.log_error(f"Error handling setFunctionPrototype request: {e}") self._send_json_response({"error": str(e)}, 500) elif path == "/makeFunctionAt": # Create a function at an address (idempotent if already exists) address_str = params.get("address") or params.get("addr") arch = params.get("platform") or params.get("arch") or params.get("architecture") if not address_str: self._send_json_response( { "error": "Missing address parameter", "help": "Required: address (hex like 0x401000 or decimal). Optional: platform (e.g., linux-x86_64; use 'default' for view default)", }, 400, ) return try: res = self.endpoints.make_function_at(address_str, arch) # If the endpoint signals an error, forward with 400 so clients can react properly if isinstance(res, dict) and res.get("error"): self._send_json_response(res, 400) else: self._send_json_response(res) except ValueError as ve: self._send_json_response({"error": str(ve)}, 400) except Exception as e: bn.log_error(f"Error handling makeFunctionAt: {e}") self._send_json_response({"error": str(e)}, 500) elif path == "/platforms": try: self._send_json_response(self.endpoints.list_platforms()) except Exception as e: bn.log_error(f"Error listing platforms: {e}") self._send_json_response({"error": str(e)}, 500) elif path == "/setLocalVariableType": fn_ident = ( params.get("functionAddress") or params.get("address") or params.get("function") or params.get("functionName") or params.get("name") ) var_name = ( params.get("variableName") or params.get("variable") or params.get("nameOrVar") ) new_type = params.get("newType") or params.get("type") or params.get("signature") if not fn_ident or not var_name or new_type is None: self._send_json_response( { "error": "Missing parameters", "help": "Required: functionAddress (or address/name), variableName, newType (or type/signature)", "received": params, }, 400, ) return try: res = self.endpoints.set_local_variable_type(fn_ident, var_name, new_type) self._send_json_response(res) except ValueError as ve: self._send_json_response({"error": str(ve)}, 400) except Exception as e: bn.log_error(f"Error handling setLocalVariableType request: {e}") self._send_json_response({"error": str(e)}, 500) elif path == "/retypeVariable": function_name = params.get("functionName") if not function_name: self._send_json_response({"error": "Missing function name parameter"}, 400) return variable_name = params.get("variableName") if not variable_name: self._send_json_response({"error": "Missing variable name parameter"}, 400) return type_str = params.get("type") if not type_str: self._send_json_response({"error": "Missing type parameter"}, 400) return try: self._send_json_response( self.endpoints.retype_variable(function_name, variable_name, type_str) ) except Exception as e: bn.log_error(f"Error handling retypeVariable request: {e}") self._send_json_response( {"error": str(e)}, 500, ) elif path == "/renameVariable": function_name = params.get("functionName") if not function_name: self._send_json_response({"error": "Missing function name parameter"}, 400) return variable_name = params.get("variableName") if not variable_name: self._send_json_response({"error": "Missing variable name parameter"}, 400) return new_name = params.get("newName") if not new_name: self._send_json_response({"error": "Missing new name parameter"}, 400) return try: self._send_json_response( self.endpoints.rename_variable(function_name, variable_name, new_name) ) except Exception as e: bn.log_error(f"Error handling renameVariable request: {e}") self._send_json_response( {"error": str(e)}, 500, ) elif path == "/renameVariables": # Batch rename local variables in a function # Accept flexible identifiers and payload formats (GET/POST) fn_ident = ( params.get("functionAddress") or params.get("address") or params.get("function") or params.get("functionName") or params.get("name") ) if not fn_ident: self._send_json_response( { "error": "Missing function identifier", "help": "Provide functionAddress/address or functionName/name", "received": params, }, 400, ) return raw_renames = None # Prefer explicit 'renames' in JSON when POSTed if isinstance(params, dict) and "renames" in params: raw_renames = params.get("renames") # Or a JSON mapping under 'mapping' if raw_renames is None and "mapping" in params: try: m = params.get("mapping") if isinstance(m, str): raw_renames = json.loads(m) else: raw_renames = m except Exception: raw_renames = None # Or a compact 'pairs' string: old1:new1,old2:new2 if raw_renames is None and "pairs" in params: pairs_str = params.get("pairs") or "" mapping = {} try: for item in pairs_str.split(","): if not item.strip(): continue if ":" in item: o, n = item.split(":", 1) mapping[o.strip()] = n.strip() except Exception: mapping = {} raw_renames = mapping if raw_renames is None: self._send_json_response( { "error": "Missing renames payload", "help": "Provide 'renames' (array of {old,new}) or 'mapping' (JSON object old->new) or 'pairs' (old:new,...)", "received": params, }, 400, ) return try: result = self.endpoints.rename_variables(fn_ident, raw_renames) self._send_json_response(result) except ValueError as ve: self._send_json_response({"error": str(ve)}, 400) except Exception as e: bn.log_error(f"Error handling renameVariables request: {e}") self._send_json_response({"error": str(e)}, 500) elif path == "/getXrefsTo": address_str = params.get("address") if not address_str: self._send_json_response( { "error": "Missing address parameter", "help": "Required parameter: address (hex like 0x401000 or decimal)", "received": params, }, 400, ) return try: result = self.binary_ops.get_xrefs_to_address(address_str) self._send_json_response(result) except ValueError as ve: self._send_json_response({"error": str(ve)}, 400) except Exception as e: bn.log_error(f"Error handling getXrefsTo request: {e}") self._send_json_response({"error": str(e)}, 500) elif path == "/getXrefsToField": struct_name = params.get("struct") or params.get("structName") field_name = params.get("field") or params.get("fieldName") if not struct_name or not field_name: self._send_json_response( { "error": "Missing parameters", "help": "Required: struct (or structName), field (or fieldName)", "received": params, }, 400, ) return try: refs = self.binary_ops.get_xrefs_to_field(struct_name, field_name) self._send_json_response( {"struct": struct_name, "field": field_name, "references": refs} ) except Exception as e: bn.log_error(f"Error handling getXrefsToField: {e}") self._send_json_response({"error": str(e)}, 500) elif path == "/getXrefsToStruct": struct_name = params.get("name") or params.get("struct") or params.get("structName") if not struct_name: self._send_json_response( { "error": "Missing struct name parameter", "help": "Required: name (or struct/structName)", "received": params, }, 400, ) return try: refs = self.binary_ops.get_xrefs_to_struct(struct_name) self._send_json_response(refs) except Exception as e: bn.log_error(f"Error handling getXrefsToStruct: {e}") self._send_json_response({"error": str(e)}, 500) elif path == "/getXrefsToType": type_name = params.get("name") or params.get("type") or params.get("typeName") if not type_name: self._send_json_response( { "error": "Missing type name parameter", "help": "Required: name (or type/typeName)", "received": params, }, 400, ) return try: refs = self.binary_ops.get_xrefs_to_type(type_name) self._send_json_response(refs) except Exception as e: bn.log_error(f"Error handling getXrefsToType: {e}") self._send_json_response({"error": str(e)}, 500) elif path == "/getTypeInfo": type_name = params.get("name") or params.get("type") or params.get("typeName") if not type_name: self._send_json_response( { "error": "Missing type name parameter", "help": "Required: name (or type/typeName)", }, 400, ) return try: info = self.binary_ops.get_type_info(type_name) self._send_json_response(info) except Exception as e: bn.log_error(f"Error handling getTypeInfo: {e}") self._send_json_response({"error": str(e)}, 500) elif path == "/getXrefsToEnum": enum_name = params.get("name") or params.get("enum") or params.get("enumName") if not enum_name: self._send_json_response( { "error": "Missing enum name parameter", "help": "Required: name (or enum/enumName)", "received": params, }, 400, ) return try: refs = self.binary_ops.get_xrefs_to_enum(enum_name) self._send_json_response(refs) except Exception as e: bn.log_error(f"Error handling getXrefsToEnum: {e}") self._send_json_response({"error": str(e)}, 500) # '/displayAs' endpoint removed per request elif path == "/formatValue": # Compute representations and annotate BN at an address text = params.get("text") size_param = params.get("size") address_str = params.get("address") if not text or not address_str: self._send_json_response( { "error": "Missing parameters", "help": "Required: address, text. Optional: size", "received": params, }, 400, ) return try: # Parse address if isinstance(address_str, str) and address_str.startswith("0x"): addr = int(address_str, 16) else: addr = int(address_str) except Exception: self._send_json_response({"error": "Invalid address format"}, 400) return try: conv = util_convert_number(text, size_param) # Create a concise annotation bases = conv.get("bases", {}) c_lit = conv.get("c_literal") c_str = conv.get("c_string") parts = [] if "hex" in bases: parts.append(f"hex={bases['hex']}") if "dec" in bases: parts.append(f"dec={bases['dec']}") if c_lit: parts.append(f"char={c_lit}") if c_str: # Trim long strings for comments s = c_str if len(s) > 64: s = s[:61] + '"…' parts.append(f"str={s}") annot = "Converted: " + ", ".join(parts) if parts else f"Converted: {conv}" applied = self.binary_ops.set_comment(addr, annot) self._send_json_response( { "address": hex(addr), "converted": conv, "applied_comment": bool(applied), "comment": annot, } ) except Exception as e: bn.log_error(f"Error handling formatValue: {e}") self._send_json_response({"error": str(e)}, 500) elif path == "/convertNumber": # Compute number/string representations (bases, LE/BE, C literals) try: text = params.get("text") size_param = params.get("size") if text is None: self._send_json_response( { "error": "Missing text parameter", "help": "Required: text. Optional: size (1,2,4,8 or 0 for auto)", "received": params, }, 400, ) return conv = util_convert_number(text, size_param) self._send_json_response(conv) except Exception as e: bn.log_error(f"Error handling convertNumber: {e}") self._send_json_response({"error": str(e)}, 500) elif path == "/getXrefsToUnion": union_name = params.get("name") or params.get("union") or params.get("unionName") if not union_name: self._send_json_response( { "error": "Missing union name parameter", "help": "Required: name (or union/unionName)", "received": params, }, 400, ) return try: refs = self.binary_ops.get_xrefs_to_union(union_name) self._send_json_response(refs) except Exception as e: bn.log_error(f"Error handling getXrefsToUnion: {e}") self._send_json_response({"error": str(e)}, 500) elif path == "/getStackFrameVars": function_identifier = params.get("name") or params.get("address") if not function_identifier: self._send_json_response( { "error": "Missing required parameter: name or address", "received": params, }, 400, ) return try: result = self.endpoints.get_stack_frame_vars(function_identifier) self._send_json_response({"stack_frame_vars": result}) except ValueError as ve: self._send_json_response({"error": str(ve)}, 400) except Exception as e: bn.log_error(f"Error handling getStackFrameVars request: {e}") self._send_json_response({"error": str(e)}, 500) elif path == "/defineTypes": c_code = params.get("cCode") if not c_code: self._send_json_response({"error": "Missing cCode parameter"}, 400) return try: self._send_json_response(self.endpoints.define_types(c_code)) except Exception as e: bn.log_error(f"Error handling defineTypes request: {e}") self._send_json_response( {"error": str(e)}, 500, ) elif path == "/declareCType": c_decl = ( params.get("declaration") or params.get("cDecl") or params.get("cDeclaration") or params.get("decl") ) if not c_decl: self._send_json_response( { "error": "Missing declaration parameter", "help": "Use 'declaration' with a single C type declaration", }, 400, ) return try: self._send_json_response(self.endpoints.declare_c_type(c_decl)) except ValueError as ve: self._send_json_response({"error": str(ve)}, 400) except Exception as e: bn.log_error(f"Error handling declareCType request: {e}") self._send_json_response({"error": str(e)}, 500) elif path == "/patch" or path == "/patchBytes": address = params.get("address") or params.get("addr") data = params.get("data") or params.get("bytes") # Parse save_to_file parameter (default True for backwards compatibility) save_to_file_param = params.get("save_to_file", True) if isinstance(save_to_file_param, bool): save_to_file = save_to_file_param else: save_to_file = str(save_to_file_param).lower() not in ( "false", "0", "no", ) if not address: self._send_json_response( { "error": "Missing address parameter", "help": "Required: address (hex like 0x401000 or decimal). Optional: data (hex string like '90 90' or '9090'), save_to_file (bool, default true)", "received": params, }, 400, ) return if not data: self._send_json_response( { "error": "Missing data parameter", "help": "Required: data (hex string like '90 90' or '9090', or list of integers)", "received": params, }, 400, ) return try: # Parse data if it's a JSON string (for list format) if isinstance(data, str): try: # Try to parse as JSON array parsed = json.loads(data) if isinstance(parsed, list): data = parsed except (json.JSONDecodeError, ValueError): # Not JSON, treat as hex string pass result = self.endpoints.patch_bytes(address, data, save_to_file) self._send_json_response(result) except ValueError as ve: self._send_json_response({"error": str(ve)}, 400) except Exception as e: bn.log_error(f"Error handling patch request: {e}") self._send_json_response({"error": str(e)}, 500) else: self._send_json_response({"error": "Not found"}, 404) except Exception as e: bn.log_error(f"Error handling GET request: {e}") self._send_json_response({"error": str(e)}, 500) def _handle_decompile(self, function_name: str): """Handle function decompilation requests. Args: function_name: Name or address of the function to decompile Sends JSON response with either: - Decompiled function code and metadata - Error message with available functions list """ try: func_info = self.binary_ops.get_function_info(function_name) if not func_info: bn.log_error(f"Function not found: {function_name}") self._send_json_response( { "error": "Function not found", "requested_name": function_name, "available_functions": self.binary_ops.get_function_names(0, 10), }, 404, ) return bn.log_info(f"Found function for decompilation: {func_info}") decompiled = self.binary_ops.decompile_function(function_name) if decompiled is None: self._send_json_response( { "error": "Decompilation failed", "function": func_info, "reason": "Function could not be decompiled. This might be due to missing debug information or unsupported function type.", }, 500, ) else: self._send_json_response({"decompiled": decompiled, "function": func_info}) except Exception as e: bn.log_error(f"Error during decompilation: {e}") self._send_json_response( { "error": f"Decompilation error: {e!s}", "requested_name": function_name, }, 500, ) def do_POST(self): try: if not self._check_binary_loaded(): return params = self._parse_post_params() path = urllib.parse.urlparse(self.path).path bn.log_info(f"POST {path} with params: {params}") if path == "/load": filepath = params.get("filepath") if not filepath: self._send_json_response({"error": "Missing filepath parameter"}, 400) return try: self.binary_ops.load_binary(filepath) self._send_json_response( {"success": True, "message": f"Binary loaded: {filepath}"} ) except Exception as e: self._send_json_response({"error": str(e)}, 500) elif path == "/rename/function" or path == "/renameFunction": old_name = params.get("oldName") or params.get("old_name") new_name = params.get("newName") or params.get("new_name") bn.log_info( f"Rename request - old_name: {old_name}, new_name: {new_name}, params: {params}" ) if not old_name or not new_name: self._send_json_response( { "error": "Missing parameters", "help": "Required parameters: oldName (or old_name) and newName (or new_name)", "received": params, }, 400, ) return # Handle address format (both 0x... and plain number) if isinstance(old_name, str): if old_name.startswith("0x"): try: old_name = int(old_name, 16) except ValueError: pass elif old_name.isdigit(): old_name = int(old_name) bn.log_info(f"Attempting to rename function: {old_name} -> {new_name}") # Apply prefix from settings settings = Settings() prefix = settings.get_string("mcp.renamePrefix") if prefix and not new_name.startswith(prefix): new_name = prefix + new_name bn.log_info(f"Applied prefix '{prefix}': {new_name}") # Get function info for validation func_info = self.binary_ops.get_function_info(old_name) if func_info: bn.log_info(f"Found function: {func_info}") success = self.binary_ops.rename_function(old_name, new_name) if success: self._send_json_response( { "success": True, "message": f"Successfully renamed function from {old_name} to {new_name}", "function": func_info, } ) else: self._send_json_response( { "error": "Failed to rename function", "message": "The function was found but could not be renamed. This might be due to permissions or binary restrictions.", "function": func_info, }, 500, ) else: available_funcs = self.binary_ops.get_function_names(0, 10) bn.log_error(f"Function not found: {old_name}") self._send_json_response( { "error": "Function not found", "requested": old_name, "help": "Make sure the function exists. You can use either the function name or its address.", "available_functions": available_funcs, }, 404, ) elif path == "/rename/data" or path == "/renameData": address = params.get("address") new_name = params.get("newName") or params.get("new_name") if not address or not new_name: self._send_json_response({"error": "Missing parameters"}, 400) return try: address_int = int(address, 16) if isinstance(address, str) else int(address) success = self.binary_ops.rename_data(address_int, new_name) self._send_json_response({"success": success}) except ValueError: self._send_json_response({"error": "Invalid address format"}, 400) elif path == "/comment": if self.command == "GET": address = params.get("address") if not address: self._send_json_response( { "error": "Missing address parameter", "help": "Required parameter: address", "received": params, }, 400, ) return try: address_int = int(address, 16) if isinstance(address, str) else int(address) comment = self.binary_ops.get_comment(address_int) if comment is not None: self._send_json_response( { "success": True, "address": hex(address_int), "comment": comment, } ) else: self._send_json_response( { "success": True, "address": hex(address_int), "comment": None, "message": "No comment found at this address", } ) except ValueError: self._send_json_response({"error": "Invalid address format"}, 400) elif self.command == "DELETE": address = params.get("address") if not address: self._send_json_response( { "error": "Missing address parameter", "help": "Required parameter: address", "received": params, }, 400, ) return try: address_int = int(address, 16) if isinstance(address, str) else int(address) success = self.binary_ops.delete_comment(address_int) if success: self._send_json_response( { "success": True, "message": f"Successfully deleted comment at {hex(address_int)}", } ) else: self._send_json_response( { "error": "Failed to delete comment", "message": "The comment could not be deleted at the specified address.", }, 500, ) except ValueError: self._send_json_response({"error": "Invalid address format"}, 400) else: # POST address = params.get("address") comment = params.get("comment") if not address or comment is None: self._send_json_response( { "error": "Missing parameters", "help": "Required parameters: address and comment", "received": params, }, 400, ) return try: address_int = int(address, 16) if isinstance(address, str) else int(address) success = self.binary_ops.set_comment(address_int, comment) if success: self._send_json_response( { "success": True, "message": f"Successfully set comment at {hex(address_int)}", "comment": comment, } ) else: self._send_json_response( { "error": "Failed to set comment", "message": "The comment could not be set at the specified address.", }, 500, ) except ValueError: self._send_json_response({"error": "Invalid address format"}, 400) elif path == "/comment/function": if self.command == "GET": function_name = params.get("name") or params.get("functionName") if not function_name: self._send_json_response( { "error": "Missing function name parameter", "help": "Required parameter: name (or functionName)", "received": params, }, 400, ) return comment = self.binary_ops.get_function_comment(function_name) if comment is not None: self._send_json_response( { "success": True, "function": function_name, "comment": comment, } ) else: self._send_json_response( { "success": True, "function": function_name, "comment": None, "message": "No comment found for this function", } ) elif self.command == "DELETE": function_name = params.get("name") or params.get("functionName") if not function_name: self._send_json_response( { "error": "Missing function name parameter", "help": "Required parameter: name (or functionName)", "received": params, }, 400, ) return success = self.binary_ops.delete_function_comment(function_name) if success: self._send_json_response( { "success": True, "message": f"Successfully deleted comment for function {function_name}", } ) else: self._send_json_response( { "error": "Failed to delete function comment", "message": "The comment could not be deleted for the specified function.", }, 500, ) else: # POST function_name = params.get("name") or params.get("functionName") comment = params.get("comment") if not function_name or comment is None: self._send_json_response( { "error": "Missing parameters", "help": "Required parameters: name (or functionName) and comment", "received": params, }, 400, ) return success = self.binary_ops.set_function_comment(function_name, comment) if success: self._send_json_response( { "success": True, "message": f"Successfully set comment for function {function_name}", "comment": comment, } ) else: self._send_json_response( { "error": "Failed to set function comment", "message": "The comment could not be set for the specified function.", }, 500, ) elif path == "/getComment": address = params.get("address") if not address: self._send_json_response( { "error": "Missing address parameter", "help": "Required parameter: address", "received": params, }, 400, ) return try: address_int = int(address, 16) if isinstance(address, str) else int(address) comment = self.binary_ops.get_comment(address_int) if comment is not None: self._send_json_response( { "success": True, "address": hex(address_int), "comment": comment, } ) else: self._send_json_response( { "success": True, "address": hex(address_int), "comment": None, "message": "No comment found at this address", } ) except ValueError: self._send_json_response({"error": "Invalid address format"}, 400) elif path == "/getFunctionComment": function_name = params.get("functionName") or params.get("name") if not function_name: self._send_json_response( { "error": "Missing function name parameter", "help": "Required parameter: name (or functionName)", "received": params, }, 400, ) return comment = self.binary_ops.get_function_comment(function_name) if comment is not None: self._send_json_response( { "success": True, "function": function_name, "comment": comment, } ) else: self._send_json_response( { "success": True, "function": function_name, "comment": None, "message": "No comment found for this function", } ) elif path == "/patch" or path == "/patchBytes": address = params.get("address") or params.get("addr") data = params.get("data") or params.get("bytes") # Parse save_to_file parameter (default True for backwards compatibility) save_to_file_param = params.get("save_to_file", True) if isinstance(save_to_file_param, bool): save_to_file = save_to_file_param else: save_to_file = str(save_to_file_param).lower() not in ( "false", "0", "no", ) if not address: self._send_json_response( { "error": "Missing address parameter", "help": "Required: address (hex like 0x401000 or decimal). Optional: data (hex string like '90 90' or '9090'), save_to_file (bool, default true)", "received": params, }, 400, ) return if not data: self._send_json_response( { "error": "Missing data parameter", "help": "Required: data (hex string like '90 90' or '9090', or list of integers)", "received": params, }, 400, ) return try: # Parse data if it's a JSON string (for list format) if isinstance(data, str): try: # Try to parse as JSON array parsed = json.loads(data) if isinstance(parsed, list): data = parsed except (json.JSONDecodeError, ValueError): # Not JSON, treat as hex string pass result = self.endpoints.patch_bytes(address, data, save_to_file) self._send_json_response(result) except ValueError as ve: self._send_json_response({"error": str(ve)}, 400) except Exception as e: bn.log_error(f"Error handling patch request: {e}") self._send_json_response({"error": str(e)}, 500) else: self._send_json_response({"error": "Not found"}, 404) except Exception as e: bn.log_error(f"Error handling POST request: {e}") self._send_json_response({"error": str(e)}, 500) class MCPServer: """HTTP server for Binary Ninja MCP plugin. Provides REST API endpoints for: - Binary analysis and manipulation - Function decompilation - Symbol renaming - Data inspection """ def __init__(self, config: Config): self.config = config self.server = None self.thread = None self.binary_ops = BinaryOperations(config.binary_ninja) def start(self): """Start the HTTP server in a background thread.""" server_address = (self.config.server.host, self.config.server.port) # Create handler with access to binary operations handler_class = type( "MCPRequestHandlerWithOps", (MCPRequestHandler,), {"binary_ops": self.binary_ops}, ) self.server = HTTPServer(server_address, handler_class) self.thread = threading.Thread(target=self.server.serve_forever) self.thread.daemon = True self.thread.start() bn.log_info(f"Server started on {self.config.server.host}:{self.config.server.port}") def stop(self): """Stop the HTTP server and clean up resources.""" if self.server: self.server.shutdown() self.server.server_close() if self.thread: self.thread.join() # Clear references so callers can reliably detect stopped state self.thread = None self.server = None bn.log_info("Server stopped")

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/fosdickio/binary_ninja_mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server