bridge_mcp_ghidra.py•25.6 kB
# /// script
# requires-python = ">=3.10"
# dependencies = [
# "requests>=2,<3",
# "mcp>=1.2.0,<2",
# ]
# ///
import sys
import json
import requests
import argparse
import logging
from urllib.parse import urljoin
from mcp.server.fastmcp import FastMCP
DEFAULT_GHIDRA_SERVER = "http://127.0.0.1:8080/"
DEFAULT_REQUEST_TIMEOUT = 5
logger = logging.getLogger(__name__)
mcp = FastMCP("ghidra-mcp")
# Initialize ghidra_server_url with default value
ghidra_server_url = DEFAULT_GHIDRA_SERVER
# Initialize ghidra_request_timeout with default value
ghidra_request_timeout = DEFAULT_REQUEST_TIMEOUT
def safe_get(endpoint: str, params: dict = None, timeout: int = None) -> list:
"""
Perform a GET request with optional query parameters.
"""
if params is None:
params = {}
if timeout is None:
timeout = ghidra_request_timeout
url = urljoin(ghidra_server_url, endpoint)
try:
response = requests.get(url, params=params, timeout=timeout)
response.encoding = 'utf-8'
if response.ok:
return response.text.splitlines()
else:
return [f"Error {response.status_code}: {response.text.strip()}"]
except Exception as e:
return [f"Request failed: {str(e)}"]
def safe_post(endpoint: str, data: dict | str) -> str:
try:
url = urljoin(ghidra_server_url, endpoint)
if isinstance(data, dict):
response = requests.post(url, data=data, timeout=ghidra_request_timeout)
else:
response = requests.post(url, data=data.encode("utf-8"), timeout=ghidra_request_timeout)
response.encoding = 'utf-8'
if response.ok:
return response.text.strip()
else:
return f"Error {response.status_code}: {response.text.strip()}"
except Exception as e:
return f"Request failed: {str(e)}"
@mcp.tool()
def list_methods(offset: int = 0, limit: int = 100) -> list:
"""
List all function names in the program with pagination.
Now includes source type information for confidence levels:
- [IMPORTED] = High confidence (from original symbol table)
- [USER_DEFINED] = Medium confidence (analyst-created)
- [DEFAULT] = Low confidence (Ghidra-generated names like FUN_004010a0)
"""
return safe_get("methods", {"offset": offset, "limit": limit})
@mcp.tool()
def list_classes(offset: int = 0, limit: int = 100) -> list:
"""
List all namespace/class names in the program with pagination.
"""
return safe_get("classes", {"offset": offset, "limit": limit})
@mcp.tool()
def decompile_function(name: str) -> str:
"""
Decompile a specific function by name and return the decompiled C code.
"""
return safe_post("decompile", name)
@mcp.tool()
def rename_function(old_name: str, new_name: str) -> str:
"""
Rename a function by its current name to a new user-defined name.
Functions from the original symbol table are protected and cannot be renamed.
Args:
old_name: Current name of the function
new_name: New name to set for the function
Returns:
A status message indicating success, protection, or failure.
"""
return safe_post("renameFunction", {"oldName": old_name, "newName": new_name})
@mcp.tool()
def rename_data(address: str, new_name: str) -> str:
"""
Rename a data label at the specified address.
Data labels from the original symbol table are protected and cannot be renamed.
Args:
address: Address of the data to rename
new_name: New name for the data
Returns:
A status message indicating success, protection, or failure.
"""
return safe_post("renameData", {"address": address, "newName": new_name})
@mcp.tool()
def retype_data(address: str, data_type: str) -> str:
"""
Set the data type for global data at the specified address.
Creates data if none exists at the address, or updates existing data type.
Args:
address: Address of the data to retype (e.g., "0x401000")
data_type: New data type name (e.g., "int", "char*", "MyStruct")
Returns:
A status message indicating success or failure.
"""
return safe_post("retypeData", {"address": address, "dataType": data_type})
@mcp.tool()
def list_segments(offset: int = 0, limit: int = 100) -> list:
"""
List all memory segments in the program with pagination.
"""
return safe_get("segments", {"offset": offset, "limit": limit})
@mcp.tool()
def list_imports(offset: int = 0, limit: int = 100) -> list:
"""
List imported symbols in the program with pagination.
Now includes source type information for confidence levels:
- [IMPORTED] = High confidence (from original symbol table)
- [USER_DEFINED] = Medium confidence (analyst-created)
- [DEFAULT] = Low confidence (Ghidra-generated)
"""
return safe_get("imports", {"offset": offset, "limit": limit})
@mcp.tool()
def list_exports(offset: int = 0, limit: int = 100) -> list:
"""
List exported functions/symbols with pagination.
Now includes source type information for confidence levels:
- [IMPORTED] = High confidence (from original symbol table)
- [USER_DEFINED] = Medium confidence (analyst-created)
- [DEFAULT] = Low confidence (Ghidra-generated)
"""
return safe_get("exports", {"offset": offset, "limit": limit})
@mcp.tool()
def list_namespaces(offset: int = 0, limit: int = 100) -> list:
"""
List all non-global namespaces in the program with pagination.
"""
return safe_get("namespaces", {"offset": offset, "limit": limit})
@mcp.tool()
def list_data_items(offset: int = 0, limit: int = 100) -> list:
"""
List defined data labels and their values with pagination.
Now includes source type information for confidence levels:
- [IMPORTED] = High confidence (from original symbol table)
- [USER_DEFINED] = Medium confidence (analyst-created)
- [DEFAULT] = Low confidence (Ghidra-generated)
- [NONE] = No symbol defined at this address
"""
return safe_get("data", {"offset": offset, "limit": limit})
@mcp.tool()
def search_functions_by_name(query: str, offset: int = 0, limit: int = 100) -> list:
"""
Search for functions whose name contains the given substring.
"""
if not query:
return ["Error: query string is required"]
return safe_get("searchFunctions", {"query": query, "offset": offset, "limit": limit})
@mcp.tool()
def rename_variable(function_name: str, old_name: str, new_name: str) -> str:
"""
Rename a local variable within a function.
Variables from debug information are protected and cannot be renamed.
Args:
function_name: Name of the function containing the variable
old_name: Current name of the variable
new_name: New name for the variable
Returns:
A status message indicating success, protection, or failure.
"""
return safe_post("renameVariable", {
"functionName": function_name,
"oldName": old_name,
"newName": new_name
})
@mcp.tool()
def get_function_by_address(address: str) -> str:
"""
Get a function by its address.
"""
return "\n".join(safe_get("get_function_by_address", {"address": address}))
@mcp.tool()
def get_current_address() -> str:
"""
Get the address currently selected by the user.
"""
return "\n".join(safe_get("get_current_address"))
@mcp.tool()
def get_current_function() -> str:
"""
Get the function currently selected by the user.
"""
return "\n".join(safe_get("get_current_function"))
@mcp.tool()
def list_functions() -> list:
"""
List all functions in the database with source type information.
Now includes source type information for confidence levels:
- [IMPORTED] = High confidence (from original symbol table)
- [USER_DEFINED] = Medium confidence (analyst-created)
- [DEFAULT] = Low confidence (Ghidra-generated names like FUN_004010a0)
"""
return safe_get("list_functions")
@mcp.tool()
def list_types() -> list:
"""
List all data types available in the data type manager.
Shows built-in types, imported types, and user-defined structures.
Each type shows its name, display name, and category path.
Returns:
A list of strings containing type information in the format:
"TypeName (DisplayName) [category]"
"""
return safe_get("list_types")
@mcp.tool()
def decompile_function_by_address(address: str) -> str:
"""
Decompile a function at the given address.
"""
return "\n".join(safe_get("decompile_function", {"address": address}))
@mcp.tool()
def disassemble_function(address: str) -> list:
"""
Get assembly code (address: instruction; comment) for a function.
"""
return safe_get("disassemble_function", {"address": address})
@mcp.tool()
def set_decompiler_comment(address: str, comment: str) -> str:
"""
Set a comment for a given address in the function pseudocode.
"""
return safe_post("set_decompiler_comment", {"address": address, "comment": comment})
@mcp.tool()
def set_disassembly_comment(address: str, comment: str) -> str:
"""
Set a comment for a given address in the function disassembly.
"""
return safe_post("set_disassembly_comment", {"address": address, "comment": comment})
@mcp.tool()
def rename_function_by_address(function_address: str, new_name: str) -> str:
"""
Rename a function by its address.
Functions from the original symbol table are protected and cannot be renamed.
Args:
function_address: Address of the function to rename
new_name: New name for the function
Returns:
A status message indicating success, protection, or failure.
"""
return safe_post("rename_function_by_address", {"function_address": function_address, "new_name": new_name})
@mcp.tool()
def set_function_prototype(function_address: str, prototype: str) -> str:
"""
Set a function's prototype including return type, parameter names and types.
Function names from the original symbol table are protected and will not be changed.
Return type and parameter information will still be applied even if the name is protected.
Example: set_function_prototype("0x00401000", "int malloc(size_t size)")
- If "malloc" is from original symbol table: name stays "malloc", return type and params applied
- If function name is user-defined: entire prototype applied including any name change
"""
return safe_post("set_function_prototype", {"function_address": function_address, "prototype": prototype})
@mcp.tool()
def set_local_variable_type(function_address: str, variable_name: str, new_type: str) -> str:
"""
Set a local variable's type.
"""
return safe_post("set_local_variable_type", {"function_address": function_address, "variable_name": variable_name, "new_type": new_type})
@mcp.tool()
def get_xrefs_to(address: str, offset: int = 0, limit: int = 100) -> list:
"""
Get all references to the specified address (xref to).
Args:
address: Target address in hex format (e.g. "0x1400010a0")
offset: Pagination offset (default: 0)
limit: Maximum number of references to return (default: 100)
Returns:
List of references to the specified address
"""
return safe_get("xrefs_to", {"address": address, "offset": offset, "limit": limit})
@mcp.tool()
def get_xrefs_from(address: str, offset: int = 0, limit: int = 100) -> list:
"""
Get all references from the specified address (xref from).
Args:
address: Source address in hex format (e.g. "0x1400010a0")
offset: Pagination offset (default: 0)
limit: Maximum number of references to return (default: 100)
Returns:
List of references from the specified address
"""
return safe_get("xrefs_from", {"address": address, "offset": offset, "limit": limit})
@mcp.tool()
def get_function_xrefs(name: str, offset: int = 0, limit: int = 100) -> list:
"""
Get all references to the specified function by name.
Args:
name: Function name to search for
offset: Pagination offset (default: 0)
limit: Maximum number of references to return (default: 100)
Returns:
List of references to the specified function
"""
return safe_get("function_xrefs", {"name": name, "offset": offset, "limit": limit})
@mcp.tool()
def find_uses_of_struct_member(struct_name: str, member_name: str, offset: int = 0, limit: int = 100) -> list:
"""
Find all uses of a specific member of a structure by analyzing decompiled code.
This searches through all functions for member accesses including function parameters,
local variables, and dynamic allocations - not just global typed data.
Args:
struct_name: Name of the structure containing the member
member_name: Name of the member within the structure
offset: Pagination offset (default: 0)
limit: Maximum number of uses to return (default: 100)
Returns:
List of uses of the specified struct member with context including:
- Function name and address where the use occurs
- Line number in decompiled code
- The actual source line containing the member access
- Access pattern (dot notation, arrow notation, or dereference)
Note:
This method decompiles all functions in the program with extended timeouts
to find member uses, so it may take considerable time for large programs.
The operation has a 1-minute timeout limit. It finds uses through:
- Direct member access: variable.member_name
- Pointer member access: variable->member_name
- Dereference access: (*variable).member_name
"""
# Use extended timeout for comprehensive struct member analysis
# This operation can take a very long time for large programs
return safe_get("find_uses_of_struct_member", {
"struct_name": struct_name,
"member_name": member_name,
"offset": offset,
"limit": limit
}, timeout=60) # 1 minute timeout
@mcp.tool()
def list_strings(offset: int = 0, limit: int = 2000, filter: str = None) -> list:
"""
List all defined strings in the program with their addresses.
Args:
offset: Pagination offset (default: 0)
limit: Maximum number of strings to return (default: 2000)
filter: Optional filter to match within string content
Returns:
List of strings with their addresses
"""
params = {"offset": offset, "limit": limit}
if filter:
params["filter"] = filter
return safe_get("strings", params)
@mcp.tool()
def create_struct(name: str, size: int = 0, members: list = None) -> str:
"""
Create a new structure with the exact size specified.
Args:
name: The name of the new structure.
size: The exact size of the structure in bytes. The final struct will be exactly this size.
members: A list of member dictionaries to define within the struct boundaries.
Each dict should have 'name', 'type', and optionally 'offset' and 'comment'.
The 'type' should be a builtin C datatype or a structure name defined in Ghidra data type manager.
Pointers are specified with asterisk, e.g. void*, int* or PCSTR, PVOID for Windows types
Example: [{"name": "field1", "type": "int", "offset": 0, "comment": "my field"}]
When 'offset' is specified, the member is defined at that specific offset within the struct.
When 'offset' is not specified, the member is appended, which may expand the struct size.
Returns:
A status message indicating success or failure.
Note:
The resulting struct will be exactly the specified 'size'. Members with offsets are defined
within the struct boundaries without expanding its size. This is ideal for reverse engineering
where you know the struct size from malloc analysis and want to define members within it.
"""
data = {"name": name, "size": str(size)}
if members:
data["members"] = json.dumps(members)
return safe_post("create_struct", data)
@mcp.tool()
def define_struct_member(struct_name: str, member_name: str, member_type: str, offset: int, comment: str = None) -> str:
"""
Define a member in an existing structure at a specified offset.
This method defines what's at specific offsets within a predetermined struct size.
This is designed for reverse engineering workflows where struct size is predetermined
and members are discovered during analysis.
Args:
struct_name: The name of the structure to modify.
member_name: The name of the member to define at the specified offset.
member_type: The type of the member (builtin C datatype or structure name defined in Ghidra data type manager).
Pointers are specified with asterisk, e.g. void*, int* or PCSTR, PVOID for Windows types.
offset: The byte offset within the structure where this member should be placed.
comment: Optional comment for the member.
Returns:
A status message indicating success or failure.
Note:
- If no member exists at the specified offset, a new member will be defined there.
- If a member already exists at the offset, it will be replaced with the new definition.
- The struct size will not change - this only defines members within existing struct boundaries.
"""
member = {
"name": member_name,
"type": member_type,
"offset": offset
}
if comment:
member["comment"] = comment
data = {
"struct_name": struct_name,
"member": json.dumps(member)
}
return safe_post("define_struct_member", data)
@mcp.tool()
def remove_struct(struct_name: str) -> str:
"""
Remove an entire structure from the data type manager.
Args:
struct_name: The name of the structure to remove.
Returns:
A status message indicating success or failure.
"""
data = {"struct_name": struct_name}
return safe_post("remove_struct", data)
@mcp.tool()
def undefine_struct_member(struct_name: str, member_name: str) -> str:
"""
Undefine a specific member from an existing structure by name, preserving struct layout.
This is the counterpart to define_struct_member for cleaning up struct definitions.
The member will be cleared from its offset without affecting other components.
Args:
struct_name: The name of the structure to modify.
member_name: The name of the member to undefine from the structure.
Returns:
A status message indicating success or failure.
Note:
- The member is cleared from the structure definition at its offset.
- The struct size and placement of other components are preserved (no shifting).
- Cleared areas will not appear in get_struct results, consistent with never-defined fields.
- Use this to clean up incorrect member definitions while maintaining struct boundaries.
"""
data = {"struct_name": struct_name, "member_name": member_name}
return safe_post("undefine_struct_member", data)
@mcp.tool()
def resize_struct(struct_name: str, new_size: int) -> str:
"""
Resize an existing structure to a new size.
This is useful when initial size estimates from malloc analysis prove inaccurate.
Args:
struct_name: The name of the structure to resize.
new_size: The new size for the structure in bytes.
Returns:
A status message indicating success or failure.
"""
data = {"struct_name": struct_name, "new_size": str(new_size)}
return safe_post("resize_struct", data)
@mcp.tool()
def rename_struct(struct_name: str, new_name: str) -> str:
"""
Rename an existing structure in the data type manager.
This is useful for giving structures more meaningful names during analysis.
Note: Ghidra automatically updates all references and usages of the structure
throughout the program when it is renamed. You do not need to manually update
any code or data that uses this structure - the rename is handled transparently.
Args:
struct_name: The current name of the structure to rename.
new_name: The new name for the structure.
Returns:
A status message indicating success or failure.
"""
data = {"struct_name": struct_name, "new_name": new_name}
return safe_post("rename_struct", data)
@mcp.tool()
def get_struct(name: str) -> dict:
"""
Get a struct's definition.
Args:
name: The name of the structure.
Returns:
A dictionary representing the struct, or an error message.
"""
params = {"name": name}
response_lines = safe_get("get_struct", params)
response_str = "\n".join(response_lines)
try:
# Attempt to parse the JSON response
return json.loads(response_str)
except json.JSONDecodeError:
# If it's not JSON, it's likely an error message
return {"error": response_str}
@mcp.tool()
def get_data_by_label(label: str) -> str:
"""
Get information about a data label.
Args:
label: Exact symbol / label name to look up in the program.
Returns:
A newline-separated string.
Each line has: "<label> -> <address> : <value-representation>"
If the label is not found, an explanatory message is returned.
"""
return "\n".join(safe_get("get_data_by_label", {"label": label}))
@mcp.tool()
def get_bytes(address: str, size: int = 1) -> str:
"""
Read raw bytes from memory and dump them in hex.
Args:
address: Start address in hex notation (e.g. "0x1401003A0").
size: Number of bytes to read (default: 1).
Returns:
A hexdump-style multiline string.
Format: "<address> <16-byte hex sequence…>".
On error (invalid address / size ≤ 0) an error message is returned.
"""
return "\n".join(safe_get("get_bytes", {"address": address, "size": size}))
@mcp.tool()
def search_bytes(bytes_hex: str, offset: int = 0, limit: int = 100) -> list:
"""
Search the whole program for a specific byte sequence.
Args:
bytes_hex: Byte sequence encoded as a hex string
(e.g. "DEADBEEF" or "DE AD BE EF").
offset: Pagination offset for results (default: 0).
limit: Maximum number of hit addresses to return (default: 100).
Returns:
A list of addresses (as hex strings) where the sequence was found,
subject to pagination. If no hits, an explanatory message list
such as ["No matches found"] is returned.
"""
return safe_get(
"search_bytes",
{"bytes": bytes_hex, "offset": offset, "limit": limit},
)
def main():
parser = argparse.ArgumentParser(description="MCP server for Ghidra")
parser.add_argument("--ghidra-server", type=str, default=DEFAULT_GHIDRA_SERVER,
help=f"Ghidra server URL, default: {DEFAULT_GHIDRA_SERVER}")
parser.add_argument("--mcp-host", type=str, default="127.0.0.1",
help="Host to run MCP server on (only used for sse), default: 127.0.0.1")
parser.add_argument("--mcp-port", type=int,
help="Port to run MCP server on (only used for sse), default: 8081")
parser.add_argument("--transport", type=str, default="stdio", choices=["stdio", "sse"],
help="Transport protocol for MCP, default: stdio")
parser.add_argument("--ghidra-timeout", type=int, default=DEFAULT_REQUEST_TIMEOUT,
help=f"MCP requests timeout, default: {DEFAULT_REQUEST_TIMEOUT}")
args = parser.parse_args()
# Use the global variable to ensure it's properly updated
global ghidra_server_url
if args.ghidra_server:
ghidra_server_url = args.ghidra_server
global ghidra_request_timeout
if args.ghidra_timeout:
ghidra_request_timeout = args.ghidra_timeout
if args.transport == "sse":
try:
# Set up logging
log_level = logging.INFO
logging.basicConfig(level=log_level)
logging.getLogger().setLevel(log_level)
# Configure MCP settings
mcp.settings.log_level = "INFO"
if args.mcp_host:
mcp.settings.host = args.mcp_host
else:
mcp.settings.host = "127.0.0.1"
if args.mcp_port:
mcp.settings.port = args.mcp_port
else:
mcp.settings.port = 8081
logger.info(f"Connecting to Ghidra server at {ghidra_server_url}")
logger.info(f"Starting MCP server on http://{mcp.settings.host}:{mcp.settings.port}/sse")
logger.info(f"Using transport: {args.transport}")
mcp.run(transport="sse")
except KeyboardInterrupt:
logger.info("Server stopped by user")
else:
mcp.run()
if __name__ == "__main__":
main()