from itertools import islice
import struct
from typing import Annotated, Optional
import ida_hexrays
import ida_lines
import ida_funcs
import idaapi
import idautils
import ida_typeinf
import ida_nalt
import ida_bytes
import ida_ida
import ida_entry
import ida_idaapi
import ida_xref
import ida_ua
import ida_name
from .rpc import tool
from .sync import idasync, tool_timeout
from .utils import (
parse_address,
normalize_list_input,
get_function,
get_prototype,
get_stack_frame_variables_internal,
decompile_function_safe,
get_assembly_lines,
get_all_xrefs,
get_all_comments,
Function,
Argument,
DisassemblyFunction,
Xref,
BasicBlock,
StructFieldQuery,
InsnPattern,
)
# ============================================================================
# Instruction Helpers
# ============================================================================
_IMM_SCAN_BACK_MAX = 15
def _decode_insn_at(ea: int) -> ida_ua.insn_t | None:
insn = ida_ua.insn_t()
if ida_ua.decode_insn(insn, ea) == 0:
return None
return insn
def _next_head(ea: int, end_ea: int) -> int:
return ida_bytes.next_head(ea, end_ea)
def _operand_value(insn: ida_ua.insn_t, i: int) -> int | None:
op = insn.ops[i]
if op.type == ida_ua.o_void:
return None
if op.type in (ida_ua.o_mem, ida_ua.o_far, ida_ua.o_near):
return op.addr
return op.value
def _operand_type(insn: ida_ua.insn_t, i: int) -> int:
return insn.ops[i].type
def _insn_mnem(insn: ida_ua.insn_t) -> str:
try:
return insn.get_canon_mnem().lower()
except Exception:
return ""
def _value_to_le_bytes(value: int) -> tuple[bytes, int, int] | None:
if value < 0:
if value >= -0x80000000:
size = 4
value &= 0xFFFFFFFF
elif value >= -0x8000000000000000:
size = 8
value &= 0xFFFFFFFFFFFFFFFF
else:
return None
else:
if value <= 0xFFFFFFFF:
size = 4
elif value <= 0xFFFFFFFFFFFFFFFF:
size = 8
else:
return None
fmt = "<I" if size == 4 else "<Q"
return struct.pack(fmt, value), size, value
def _value_candidates_for_immediate(value: int) -> list[tuple[int, int, bytes]]:
candidates: list[tuple[int, int, bytes]] = []
def add(size: int, signed_val: int):
if size == 4:
masked = signed_val & 0xFFFFFFFF
if not (-0x80000000 <= signed_val <= 0x7FFFFFFF):
return
b = struct.pack("<I", masked)
else:
masked = signed_val & 0xFFFFFFFFFFFFFFFF
if not (-0x8000000000000000 <= signed_val <= 0x7FFFFFFFFFFFFFFF):
return
b = struct.pack("<Q", masked)
candidates.append((masked, size, b))
add(4, value)
add(8, value)
return candidates
def _resolve_immediate_insn_start(
match_ea: int,
value: int,
seg_start: int,
alt_value: int | None = None,
) -> int | None:
start_min = max(seg_start, match_ea - _IMM_SCAN_BACK_MAX)
for start in range(match_ea, start_min - 1, -1):
insn = _decode_insn_at(start)
if insn is None:
continue
end_ea = start + insn.size
if not (start <= match_ea < end_ea):
continue
for i in range(8):
op_type = _operand_type(insn, i)
if op_type == ida_ua.o_void:
break
if op_type != ida_ua.o_imm:
continue
op_val = _operand_value(insn, i)
if op_val is None:
continue
if op_val == value or (alt_value is not None and op_val == alt_value):
offb = getattr(insn.ops[i], "offb", 0)
if offb and start + offb != match_ea:
continue
return start
return None
# ============================================================================
# Code Analysis & Decompilation
# ============================================================================
@tool
@idasync
@tool_timeout(90.0)
def decompile(
addr: Annotated[str, "Function address to decompile"],
) -> dict:
"""Decompile function to pseudocode"""
try:
start = parse_address(addr)
code = decompile_function_safe(start)
if code is None:
return {"addr": addr, "code": None, "error": "Decompilation failed"}
return {"addr": addr, "code": code}
except Exception as e:
return {"addr": addr, "code": None, "error": str(e)}
@tool
@idasync
@tool_timeout(90.0)
def disasm(
addr: Annotated[str, "Function address to disassemble"],
max_instructions: Annotated[
int, "Max instructions per function (default: 5000, max: 50000)"
] = 5000,
offset: Annotated[int, "Skip first N instructions (default: 0)"] = 0,
include_total: Annotated[
bool, "Compute total instruction count (default: false)"
] = False,
) -> dict:
"""Disassemble function to assembly instructions"""
# Enforce max limit
if max_instructions <= 0 or max_instructions > 50000:
max_instructions = 50000
if offset < 0:
offset = 0
try:
start = parse_address(addr)
func = idaapi.get_func(start)
# Get segment info
seg = idaapi.getseg(start)
if not seg:
return {
"addr": addr,
"asm": None,
"error": "No segment found",
"cursor": {"done": True},
}
segment_name = idaapi.get_segm_name(seg) if seg else "UNKNOWN"
if func:
# Function exists: disassemble function items starting from requested address
func_name: str = ida_funcs.get_func_name(func.start_ea) or "<unnamed>"
header_addr = start # Use requested address, not function start
else:
# No function: disassemble sequentially from start address
func_name = "<no function>"
header_addr = start
lines = []
seen = 0
total_count = 0
more = False
def _maybe_add(ea: int) -> bool:
nonlocal seen, total_count, more
if include_total:
total_count += 1
if seen < offset:
seen += 1
return True
if len(lines) < max_instructions:
line = ida_lines.generate_disasm_line(ea, 0)
instruction = ida_lines.tag_remove(line) if line else ""
lines.append(f"{ea:x} {instruction}")
seen += 1
return True
more = True
seen += 1
return include_total
if func:
for ea in idautils.FuncItems(func.start_ea):
if ea == idaapi.BADADDR:
continue
if ea < start:
continue
if not _maybe_add(ea):
break
else:
ea = start
while ea < seg.end_ea:
if ea == idaapi.BADADDR:
break
if _decode_insn_at(ea) is None:
break
if not _maybe_add(ea):
break
ea = _next_head(ea, seg.end_ea)
if ea == idaapi.BADADDR:
break
if include_total and not more:
more = total_count > offset + max_instructions
lines_str = f"{func_name} ({segment_name} @ {hex(header_addr)}):"
if lines:
lines_str += "\n" + "\n".join(lines)
rettype = None
args: Optional[list[Argument]] = None
stack_frame = None
if func:
tif = ida_typeinf.tinfo_t()
if ida_nalt.get_tinfo(tif, func.start_ea) and tif.is_func():
ftd = ida_typeinf.func_type_data_t()
if tif.get_func_details(ftd):
rettype = str(ftd.rettype)
args = [
Argument(name=(a.name or f"arg{i}"), type=str(a.type))
for i, a in enumerate(ftd)
]
stack_frame = get_stack_frame_variables_internal(func.start_ea, False)
out: DisassemblyFunction = {
"name": func_name,
"start_ea": hex(header_addr),
"lines": lines_str,
}
if stack_frame:
out["stack_frame"] = stack_frame
if rettype:
out["return_type"] = rettype
if args is not None:
out["arguments"] = args
return {
"addr": addr,
"asm": out,
"instruction_count": len(lines),
"total_instructions": total_count if include_total else None,
"cursor": (
{"next": offset + max_instructions}
if more
else {"done": True}
),
}
except Exception as e:
return {
"addr": addr,
"asm": None,
"error": str(e),
"cursor": {"done": True},
}
# ============================================================================
# Cross-Reference Analysis
# ============================================================================
@tool
@idasync
def xrefs_to(
addrs: Annotated[list[str] | str, "Addresses to find cross-references to"],
limit: Annotated[int, "Max xrefs per address (default: 100, max: 1000)"] = 100,
) -> list[dict]:
"""Get cross-references to specified addresses"""
addrs = normalize_list_input(addrs)
if limit <= 0 or limit > 1000:
limit = 1000
results = []
for addr in addrs:
try:
xrefs = []
more = False
for xref in idautils.XrefsTo(parse_address(addr)):
if len(xrefs) >= limit:
more = True
break
xrefs.append(
Xref(
addr=hex(xref.frm),
type="code" if xref.iscode else "data",
fn=get_function(xref.frm, raise_error=False),
)
)
results.append({"addr": addr, "xrefs": xrefs, "more": more})
except Exception as e:
results.append({"addr": addr, "xrefs": None, "error": str(e)})
return results
@tool
@idasync
def xrefs_to_field(queries: list[StructFieldQuery] | StructFieldQuery) -> list[dict]:
"""Get cross-references to structure fields"""
if isinstance(queries, dict):
queries = [queries]
results = []
til = ida_typeinf.get_idati()
if not til:
return [
{
"struct": q.get("struct"),
"field": q.get("field"),
"xrefs": [],
"error": "Failed to retrieve type library",
}
for q in queries
]
for query in queries:
struct_name = query.get("struct", "")
field_name = query.get("field", "")
try:
tif = ida_typeinf.tinfo_t()
if not tif.get_named_type(
til, struct_name, ida_typeinf.BTF_STRUCT, True, False
):
results.append(
{
"struct": struct_name,
"field": field_name,
"xrefs": [],
"error": f"Struct '{struct_name}' not found",
}
)
continue
idx = ida_typeinf.get_udm_by_fullname(None, struct_name + "." + field_name)
if idx == -1:
results.append(
{
"struct": struct_name,
"field": field_name,
"xrefs": [],
"error": f"Field '{field_name}' not found in '{struct_name}'",
}
)
continue
tid = tif.get_udm_tid(idx)
if tid == ida_idaapi.BADADDR:
results.append(
{
"struct": struct_name,
"field": field_name,
"xrefs": [],
"error": "Unable to get tid",
}
)
continue
xrefs = []
xref: ida_xref.xrefblk_t
for xref in idautils.XrefsTo(tid):
xrefs += [
Xref(
addr=hex(xref.frm),
type="code" if xref.iscode else "data",
fn=get_function(xref.frm, raise_error=False),
)
]
results.append({"struct": struct_name, "field": field_name, "xrefs": xrefs})
except Exception as e:
results.append(
{
"struct": struct_name,
"field": field_name,
"xrefs": [],
"error": str(e),
}
)
return results
# ============================================================================
# Call Graph Analysis
# ============================================================================
@tool
@idasync
def callees(
addrs: Annotated[list[str] | str, "Function addresses to get callees for"],
limit: Annotated[int, "Max callees per function (default: 200, max: 500)"] = 200,
) -> list[dict]:
"""Get functions called by the specified functions"""
addrs = normalize_list_input(addrs)
if limit <= 0 or limit > 500:
limit = 500
results = []
for fn_addr in addrs:
try:
func_start = parse_address(fn_addr)
func = idaapi.get_func(func_start)
if not func:
results.append(
{"addr": fn_addr, "callees": None, "error": "No function found"}
)
continue
func_end = func.end_ea
callees_dict = {}
more = False
current_ea = func_start
while current_ea < func_end:
if len(callees_dict) >= limit:
more = True
break
insn = _decode_insn_at(current_ea)
if insn is None:
next_ea = _next_head(current_ea, func_end)
if next_ea == idaapi.BADADDR:
break
current_ea = next_ea
continue
if insn.itype in [idaapi.NN_call, idaapi.NN_callfi, idaapi.NN_callni]:
op0 = insn.ops[0]
if op0.type in (ida_ua.o_mem, ida_ua.o_near, ida_ua.o_far):
target = op0.addr
elif op0.type == ida_ua.o_imm:
target = op0.value
else:
target = None
if target is not None and target not in callees_dict:
func_type = (
"internal"
if idaapi.get_func(target) is not None
else "external"
)
func_name = ida_name.get_name(target)
if func_name is not None:
callees_dict[target] = {
"addr": hex(target),
"name": func_name,
"type": func_type,
}
next_ea = _next_head(current_ea, func_end)
if next_ea == idaapi.BADADDR:
break
current_ea = next_ea
results.append({
"addr": fn_addr,
"callees": list(callees_dict.values()),
"more": more,
})
except Exception as e:
results.append({"addr": fn_addr, "callees": None, "error": str(e)})
return results
# ============================================================================
# Pattern Matching & Signature Tools
# ============================================================================
@tool
@idasync
def find_bytes(
patterns: Annotated[
list[str] | str, "Byte patterns to search for (e.g. '48 8B ?? ??')"
],
limit: Annotated[int, "Max matches per pattern (default: 1000, max: 10000)"] = 1000,
offset: Annotated[int, "Skip first N matches (default: 0)"] = 0,
) -> list[dict]:
"""Search for byte patterns in the binary (supports wildcards with ??)"""
patterns = normalize_list_input(patterns)
# Enforce max limit
if limit <= 0 or limit > 10000:
limit = 10000
results = []
for pattern in patterns:
matches = []
skipped = 0
more = False
try:
# Parse the pattern
compiled = ida_bytes.compiled_binpat_vec_t()
err = ida_bytes.parse_binpat_str(
compiled, ida_ida.inf_get_min_ea(), pattern, 16
)
if err:
results.append(
{
"pattern": pattern,
"matches": [],
"n": 0,
"cursor": {"done": True},
}
)
continue
# Search with early exit
ea = ida_ida.inf_get_min_ea()
max_ea = ida_ida.inf_get_max_ea()
while ea != idaapi.BADADDR:
ea = ida_bytes.bin_search(
ea, max_ea, compiled, ida_bytes.BIN_SEARCH_FORWARD
)
if ea != idaapi.BADADDR:
if skipped < offset:
skipped += 1
else:
matches.append(hex(ea))
if len(matches) >= limit:
# Check if there's more
next_ea = ida_bytes.bin_search(
ea + 1, max_ea, compiled, ida_bytes.BIN_SEARCH_FORWARD
)
more = next_ea != idaapi.BADADDR
break
ea += 1
except Exception:
pass
results.append(
{
"pattern": pattern,
"matches": matches,
"n": len(matches),
"cursor": {"next": offset + limit} if more else {"done": True},
}
)
return results
# ============================================================================
# Control Flow Analysis
# ============================================================================
@tool
@idasync
def basic_blocks(
addrs: Annotated[list[str] | str, "Function addresses to get basic blocks for"],
max_blocks: Annotated[
int, "Max basic blocks per function (default: 1000, max: 10000)"
] = 1000,
offset: Annotated[int, "Skip first N blocks (default: 0)"] = 0,
) -> list[dict]:
"""Get control flow graph basic blocks for functions"""
addrs = normalize_list_input(addrs)
# Enforce max limit
if max_blocks <= 0 or max_blocks > 10000:
max_blocks = 10000
results = []
for fn_addr in addrs:
try:
ea = parse_address(fn_addr)
func = idaapi.get_func(ea)
if not func:
results.append(
{
"addr": fn_addr,
"error": "Function not found",
"blocks": [],
"cursor": {"done": True},
}
)
continue
flowchart = idaapi.FlowChart(func)
all_blocks = []
for block in flowchart:
all_blocks.append(
BasicBlock(
start=hex(block.start_ea),
end=hex(block.end_ea),
size=block.end_ea - block.start_ea,
type=block.type,
successors=[hex(succ.start_ea) for succ in block.succs()],
predecessors=[hex(pred.start_ea) for pred in block.preds()],
)
)
# Apply pagination
total_blocks = len(all_blocks)
blocks = all_blocks[offset : offset + max_blocks]
more = offset + max_blocks < total_blocks
results.append(
{
"addr": fn_addr,
"blocks": blocks,
"count": len(blocks),
"total_blocks": total_blocks,
"cursor": (
{"next": offset + max_blocks} if more else {"done": True}
),
"error": None,
}
)
except Exception as e:
results.append(
{
"addr": fn_addr,
"error": str(e),
"blocks": [],
"cursor": {"done": True},
}
)
return results
# ============================================================================
# Search Operations
# ============================================================================
@tool
@idasync
def find(
type: Annotated[
str, "Search type: 'string', 'immediate', 'data_ref', or 'code_ref'"
],
targets: Annotated[
list[str | int] | str | int, "Search targets (strings, integers, or addresses)"
],
limit: Annotated[int, "Max matches per target (default: 1000, max: 10000)"] = 1000,
offset: Annotated[int, "Skip first N matches (default: 0)"] = 0,
) -> list[dict]:
"""Search for patterns in the binary (strings, immediate values, or references)"""
if not isinstance(targets, list):
targets = [targets]
# Enforce max limit to prevent token overflow
if limit <= 0 or limit > 10000:
limit = 10000
results = []
if type == "string":
# Raw byte search for UTF-8 substrings across the binary
for pattern in targets:
pattern_str = str(pattern)
pattern_bytes = pattern_str.encode("utf-8")
if not pattern_bytes:
results.append(
{
"query": pattern_str,
"matches": [],
"count": 0,
"cursor": {"done": True},
"error": "Empty pattern",
}
)
continue
matches = []
skipped = 0
more = False
try:
ea = ida_ida.inf_get_min_ea()
max_ea = ida_ida.inf_get_max_ea()
mask = b"\xFF" * len(pattern_bytes)
flags = ida_bytes.BIN_SEARCH_FORWARD | ida_bytes.BIN_SEARCH_NOSHOW
while ea != idaapi.BADADDR:
ea = ida_bytes.bin_search(
ea, max_ea, pattern_bytes, mask, len(pattern_bytes), flags
)
if ea != idaapi.BADADDR:
if skipped < offset:
skipped += 1
else:
matches.append(hex(ea))
if len(matches) >= limit:
next_ea = ida_bytes.bin_search(
ea + 1,
max_ea,
pattern_bytes,
mask,
len(pattern_bytes),
flags,
)
more = next_ea != idaapi.BADADDR
break
ea += 1
except Exception:
pass
results.append(
{
"query": pattern_str,
"matches": matches,
"count": len(matches),
"cursor": {"next": offset + limit} if more else {"done": True},
"error": None,
}
)
elif type == "immediate":
# Search for immediate values
for value in targets:
if isinstance(value, str):
try:
value = int(value, 0)
except ValueError:
value = 0
matches = []
skipped = 0
more = False
try:
candidates = _value_candidates_for_immediate(value)
if not candidates:
results.append(
{
"query": value,
"matches": [],
"count": 0,
"cursor": {"done": True},
"error": "Immediate out of range",
}
)
continue
seen_insn = set()
for seg_ea in idautils.Segments():
seg = idaapi.getseg(seg_ea)
if not seg or not (seg.perm & idaapi.SEGPERM_EXEC):
continue
for normalized, size, pattern_bytes in candidates:
ea = seg.start_ea
while ea != idaapi.BADADDR and ea < seg.end_ea:
ea = ida_bytes.bin_search(
ea, seg.end_ea, pattern_bytes, b"\xFF" * size, size, ida_bytes.BIN_SEARCH_FORWARD
)
if ea == idaapi.BADADDR:
break
insn_start = _resolve_immediate_insn_start(
ea, value, seg.start_ea, normalized
)
if insn_start is not None and insn_start not in seen_insn:
seen_insn.add(insn_start)
if skipped < offset:
skipped += 1
else:
matches.append(hex(insn_start))
if len(matches) >= limit:
more = True
break
ea += 1
if more:
break
if more:
break
except Exception:
pass
results.append(
{
"query": value,
"matches": matches,
"count": len(matches),
"cursor": {"next": offset + limit} if more else {"done": True},
"error": None,
}
)
elif type == "data_ref":
# Find all data references to targets
for target_str in targets:
try:
target = parse_address(str(target_str))
gen = (hex(xref) for xref in idautils.DataRefsTo(target))
# Skip offset items, take limit+1 to check more
matches = list(islice(islice(gen, offset, None), limit + 1))
more = len(matches) > limit
if more:
matches = matches[:limit]
results.append(
{
"query": str(target_str),
"matches": matches,
"count": len(matches),
"cursor": (
{"next": offset + limit} if more else {"done": True}
),
"error": None,
}
)
except Exception as e:
results.append(
{
"query": str(target_str),
"matches": [],
"count": 0,
"cursor": {"done": True},
"error": str(e),
}
)
elif type == "code_ref":
# Find all code references to targets
for target_str in targets:
try:
target = parse_address(str(target_str))
gen = (hex(xref) for xref in idautils.CodeRefsTo(target, 0))
# Skip offset items, take limit+1 to check more
matches = list(islice(islice(gen, offset, None), limit + 1))
more = len(matches) > limit
if more:
matches = matches[:limit]
results.append(
{
"query": str(target_str),
"matches": matches,
"count": len(matches),
"cursor": (
{"next": offset + limit} if more else {"done": True}
),
"error": None,
}
)
except Exception as e:
results.append(
{
"query": str(target_str),
"matches": [],
"count": 0,
"cursor": {"done": True},
"error": str(e),
}
)
else:
results.append(
{
"query": None,
"matches": [],
"count": 0,
"cursor": {"done": True},
"error": f"Unknown search type: {type}",
}
)
return results
def _resolve_insn_scan_ranges(pattern: dict, allow_broad: bool) -> tuple[list[tuple[int, int]], str | None]:
func_addr = pattern.get("func")
segment_name = pattern.get("segment")
start_s = pattern.get("start")
end_s = pattern.get("end")
exec_segments = []
for seg_ea in idautils.Segments():
seg = idaapi.getseg(seg_ea)
if seg and (seg.perm & idaapi.SEGPERM_EXEC):
exec_segments.append(seg)
if func_addr is not None:
try:
ea = parse_address(func_addr)
func = idaapi.get_func(ea)
if not func:
return [], f"Function not found at {func_addr}"
return [(func.start_ea, func.end_ea)], None
except Exception as e:
return [], str(e)
if segment_name is not None:
for seg in exec_segments:
if idaapi.get_segm_name(seg) == segment_name:
return [(seg.start_ea, seg.end_ea)], None
return [], f"Executable segment not found: {segment_name}"
if start_s is not None or end_s is not None:
if start_s is None:
return [], "start is required when end is set"
try:
start_ea = parse_address(start_s)
end_ea = parse_address(end_s) if end_s is not None else None
except Exception as e:
return [], str(e)
if not exec_segments:
return [], "No executable segments found"
if end_ea is None:
seg = idaapi.getseg(start_ea)
if not seg or not (seg.perm & idaapi.SEGPERM_EXEC):
return [], "start address not in executable segment"
end_ea = seg.end_ea
if end_ea <= start_ea:
return [], "end must be greater than start"
ranges = []
for seg in exec_segments:
seg_start = max(seg.start_ea, start_ea)
seg_end = min(seg.end_ea, end_ea)
if seg_end > seg_start:
ranges.append((seg_start, seg_end))
if not ranges:
return [], "No executable ranges within start/end"
return ranges, None
if not allow_broad:
return [], "Scope required: set func/segment/start/end or allow_broad=true"
if not exec_segments:
return [], "No executable segments found"
return [(seg.start_ea, seg.end_ea) for seg in exec_segments], None
def _scan_insn_ranges(
ranges: list[tuple[int, int]],
mnem: str,
op0_val: int | None,
op1_val: int | None,
op2_val: int | None,
any_val: int | None,
limit: int,
offset: int,
max_scan_insns: int,
) -> tuple[list[str], bool, int, bool, int | None]:
matches: list[str] = []
skipped = 0
scanned = 0
more = False
truncated = False
next_start: int | None = None
for start_ea, end_ea in ranges:
ea = start_ea
while ea < end_ea:
if scanned >= max_scan_insns:
truncated = True
next_start = ea
break
scanned += 1
insn = _decode_insn_at(ea)
if insn is None:
ea = _next_head(ea, end_ea)
if ea == idaapi.BADADDR:
break
continue
if mnem and _insn_mnem(insn) != mnem:
ea = _next_head(ea, end_ea)
if ea == idaapi.BADADDR:
break
continue
match = True
if op0_val is not None and _operand_value(insn, 0) != op0_val:
match = False
if op1_val is not None and _operand_value(insn, 1) != op1_val:
match = False
if op2_val is not None and _operand_value(insn, 2) != op2_val:
match = False
if any_val is not None and match:
found_any = False
for i in range(8):
if _operand_type(insn, i) == ida_ua.o_void:
break
if _operand_value(insn, i) == any_val:
found_any = True
break
if not found_any:
match = False
if match:
if skipped < offset:
skipped += 1
else:
matches.append(hex(ea))
if len(matches) > limit:
more = True
matches = matches[:limit]
break
ea = _next_head(ea, end_ea)
if ea == idaapi.BADADDR:
break
if more or truncated:
break
return matches, more, scanned, truncated, next_start
# ============================================================================
# Export Operations
# ============================================================================
@tool
@idasync
def export_funcs(
addrs: Annotated[list[str] | str, "Function addresses to export"],
format: Annotated[
str, "Export format: json (default), c_header, or prototypes"
] = "json",
) -> dict:
"""Export function data in various formats"""
addrs = normalize_list_input(addrs)
results = []
for addr in addrs:
try:
ea = parse_address(addr)
func = idaapi.get_func(ea)
if not func:
results.append({"addr": addr, "error": "Function not found"})
continue
func_data = {
"addr": addr,
"name": ida_funcs.get_func_name(func.start_ea),
"prototype": get_prototype(func),
"size": hex(func.end_ea - func.start_ea),
"comments": get_all_comments(ea),
}
if format == "json":
func_data["asm"] = get_assembly_lines(ea)
func_data["code"] = decompile_function_safe(ea)
func_data["xrefs"] = get_all_xrefs(ea)
results.append(func_data)
except Exception as e:
results.append({"addr": addr, "error": str(e)})
if format == "c_header":
# Generate C header file
lines = ["// Auto-generated by IDA Pro MCP", ""]
for func in results:
if "prototype" in func and func["prototype"]:
lines.append(f"{func['prototype']};")
return {"format": "c_header", "content": "\n".join(lines)}
elif format == "prototypes":
# Just prototypes
prototypes = []
for func in results:
if "prototype" in func and func["prototype"]:
prototypes.append(
{"name": func.get("name"), "prototype": func["prototype"]}
)
return {"format": "prototypes", "functions": prototypes}
return {"format": "json", "functions": results}
# ============================================================================
# Graph Operations
# ============================================================================
@tool
@idasync
def callgraph(
roots: Annotated[
list[str] | str, "Root function addresses to start call graph traversal from"
],
max_depth: Annotated[int, "Maximum depth for call graph traversal"] = 5,
max_nodes: Annotated[int, "Max nodes across the graph (default: 1000, max: 100000)"] = 1000,
max_edges: Annotated[int, "Max edges across the graph (default: 5000, max: 200000)"] = 5000,
max_edges_per_func: Annotated[
int, "Max edges per function (default: 200, max: 5000)"
] = 200,
) -> list[dict]:
"""Build call graph starting from root functions"""
roots = normalize_list_input(roots)
if max_depth < 0:
max_depth = 0
if max_nodes <= 0 or max_nodes > 100000:
max_nodes = 100000
if max_edges <= 0 or max_edges > 200000:
max_edges = 200000
if max_edges_per_func <= 0 or max_edges_per_func > 5000:
max_edges_per_func = 5000
results = []
for root in roots:
try:
ea = parse_address(root)
func = idaapi.get_func(ea)
if not func:
results.append(
{
"root": root,
"error": "Function not found",
"nodes": [],
"edges": [],
}
)
continue
nodes = {}
edges = []
visited = set()
truncated = False
per_func_capped = False
limit_reason = None
def hit_limit(reason: str):
nonlocal truncated, limit_reason
truncated = True
limit_reason = reason
def traverse(addr, depth):
nonlocal per_func_capped
if truncated:
return
if depth > max_depth or addr in visited:
return
if len(nodes) >= max_nodes:
hit_limit("nodes")
return
visited.add(addr)
f = idaapi.get_func(addr)
if not f:
return
func_name = ida_funcs.get_func_name(f.start_ea)
nodes[hex(addr)] = {
"addr": hex(addr),
"name": func_name,
"depth": depth,
}
# Get callees
edges_added = 0
for item_ea in idautils.FuncItems(f.start_ea):
if truncated:
break
for xref in idautils.CodeRefsFrom(item_ea, 0):
if truncated:
break
if edges_added >= max_edges_per_func:
per_func_capped = True
break
callee_func = idaapi.get_func(xref)
if callee_func:
if len(edges) >= max_edges:
hit_limit("edges")
break
edges.append(
{
"from": hex(addr),
"to": hex(callee_func.start_ea),
"type": "call",
}
)
edges_added += 1
traverse(callee_func.start_ea, depth + 1)
if edges_added >= max_edges_per_func:
break
traverse(ea, 0)
results.append(
{
"root": root,
"nodes": list(nodes.values()),
"edges": edges,
"max_depth": max_depth,
"truncated": truncated,
"limit_reason": limit_reason,
"max_nodes": max_nodes,
"max_edges": max_edges,
"max_edges_per_func": max_edges_per_func,
"per_func_capped": per_func_capped,
"error": None,
}
)
except Exception as e:
results.append({"root": root, "error": str(e), "nodes": [], "edges": []})
return results