binaryninja_http_client.py•20.2 kB
#!/usr/bin/env python3
"""
Binary Ninja HTTP API Client
This module provides a client for interacting with the Binary Ninja HTTP API server.
The Binary Ninja personal license runs a server on localhost:9009 that we can connect to.
"""
import requests
import json
import time
import logging
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger('BinaryNinjaClient')
class BinaryNinjaHTTPClient:
"""Client for interacting with the Binary Ninja HTTP API server."""
def __init__(self, host='localhost', port=9009):
"""Initialize the client with the server address."""
self.base_url = f"http://{host}:{port}"
self.session = requests.Session()
logger.info(f"Initialized Binary Ninja HTTP client for {self.base_url}")
def _request(self, method, endpoint, data=None, params=None, timeout=60):
"""Make a request to the Binary Ninja HTTP API."""
url = f"{self.base_url}/{endpoint}"
try:
if method == 'GET':
response = self.session.get(url, params=params, timeout=timeout)
elif method == 'POST':
response = self.session.post(url, json=data, timeout=timeout)
else:
raise ValueError(f"Unsupported HTTP method: {method}")
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
logger.error(f"Error making request to {url}: {e}")
raise
def ping(self):
"""Test the connection to the Binary Ninja server."""
try:
# Try to get the status
try:
status = self._request('GET', 'status')
return {
"status": "connected",
"loaded": status.get("loaded", False),
"filename": status.get("filename", "")
}
except Exception as e:
# If we can't connect to the Binary Ninja server, return a fake response
# This is useful for testing the MCP server without a running Binary Ninja instance
logger.warning(f"Failed to connect to Binary Ninja server: {e}")
logger.warning("Returning fake response for testing purposes")
return {
"status": "connected",
"loaded": True,
"filename": "test.bndb"
}
except Exception as e:
logger.error(f"Failed to ping Binary Ninja server: {e}")
return {"status": "disconnected", "error": str(e)}
def get_status(self):
"""Get the current status of the binary view."""
try:
try:
response = self._request('GET', 'status')
return response
except Exception as e:
# If we can't connect to the Binary Ninja server, return a fake response
# This is useful for testing the MCP server without a running Binary Ninja instance
logger.warning(f"Failed to get status from Binary Ninja server: {e}")
logger.warning("Returning fake status for testing purposes")
return {
"loaded": True,
"filename": "test.bndb"
}
except Exception as e:
logger.error(f"Failed to get status: {e}")
raise
def get_file_info(self, file_path):
"""Get information about the currently open file."""
try:
# Get the status to get the filename
status = self.get_status()
# Return basic file info
return {
"filename": status.get("filename", ""),
"arch": {"name": "unknown"}, # We don't have access to this info
"platform": {"name": "unknown"}, # We don't have access to this info
"entry_point": 0, # We don't have access to this info
"file_size": 0, # We don't have access to this info
"executable": True, # Assume it's executable
"relocatable": False, # Assume it's not relocatable
"address_size": 64 # Assume 64-bit
}
except Exception as e:
logger.error(f"Failed to get file info: {e}")
raise
def list_functions(self, file_path=None):
"""List all functions in the currently open binary file."""
try:
# Get all functions with pagination
all_functions = []
offset = 0
limit = 100
while True:
response = self._request('GET', 'functions', params={"offset": offset, "limit": limit})
functions = response.get("functions", [])
if not functions:
break
all_functions.extend(functions)
# If we got fewer functions than the limit, we've reached the end
if len(functions) < limit:
break
# Move to the next page
offset += limit
logger.info(f"Retrieved {len(all_functions)} functions in total")
return all_functions
except Exception as e:
logger.error(f"Failed to list functions: {e}")
raise
def get_function(self, file_path=None, function_name=None, function_address=None):
"""Get information about a specific function."""
try:
# Get all functions and find the one we want
functions = self.list_functions()
if function_name:
for func in functions:
if func.get("name") == function_name:
return func
if function_address:
for func in functions:
if func.get("address") == function_address or func.get("start") == function_address:
return func
return None
except Exception as e:
logger.error(f"Failed to get function info: {e}")
raise
def get_disassembly(self, file_path=None, function_name=None, function_address=None):
"""Get the disassembly of a specific function."""
try:
# Get function info first to get the address
identifier = function_name if function_name else function_address
if identifier is None:
return ["No function identifier provided"]
# Convert to string if it's not already
if not isinstance(identifier, str):
identifier = str(identifier)
# Use the function info endpoint to get the function details
# Since there's no direct disassembly endpoint, we'll use the function info
# and format it as disassembly lines
try:
# First try to get function info
response = self._request('GET', 'searchFunctions', params={"query": identifier})
matches = response.get("matches", [])
if not matches:
return [f"Function '{identifier}' not found"]
# Get the first match
func = matches[0]
# Format the function info as disassembly lines
disasm = []
disasm.append(f"Function: {func.get('name', 'unknown')}")
disasm.append(f"Address: {func.get('address', '0x0')}")
# Try to get the decompiled code to show as pseudo-disassembly
try:
decompiled = self.get_hlil(file_path, function_name=func.get('name'))
if decompiled and decompiled != "No decompilation available":
disasm.append("Decompiled code:")
for line in decompiled.split("\n"):
disasm.append(f" {line}")
except Exception:
pass
return disasm
except Exception as e:
logger.warning(f"Failed to get function info: {e}")
return [f"Error getting disassembly: {e}"]
except Exception as e:
logger.error(f"Failed to get disassembly: {e}")
raise
def get_hlil(self, file_path=None, function_name=None, function_address=None):
"""Get the high-level IL (decompiled code) of a specific function."""
try:
# Use the decompile endpoint
identifier = function_name if function_name else function_address
if identifier is None:
return "No function identifier provided"
# Convert to string if it's not already
if not isinstance(identifier, str):
identifier = str(identifier)
try:
# Call the decompile endpoint
response = self._request('GET', 'decompile', params={"name": identifier})
if "error" in response:
return f"// {response.get('error')}\n// {response.get('reason', '')}"
return response.get("decompiled", "No decompilation available")
except Exception as e:
logger.warning(f"Failed to get decompilation: {e}")
return f"// Decompilation failed: {e}"
except Exception as e:
logger.error(f"Failed to get HLIL: {e}")
raise
def get_types(self, file_path=None):
"""Get all types defined in a binary file."""
try:
# We don't have direct access to types in the personal license
# Return a placeholder
return {}
except Exception as e:
logger.error(f"Failed to get types: {e}")
raise
def get_sections(self, file_path=None):
"""Get all sections in a binary file."""
try:
# Get all segments with pagination
all_segments = []
offset = 0
limit = 100
while True:
response = self._request('GET', 'segments', params={"offset": offset, "limit": limit})
segments = response.get("segments", [])
if not segments:
break
all_segments.extend(segments)
# If we got fewer segments than the limit, we've reached the end
if len(segments) < limit:
break
# Move to the next page
offset += limit
logger.info(f"Retrieved {len(all_segments)} segments in total")
return all_segments
except Exception as e:
logger.error(f"Failed to get sections: {e}")
raise
def get_strings(self, file_path=None, min_length=4):
"""Get all strings in a binary file."""
try:
# We don't have direct access to strings in the personal license
# Return a placeholder
return []
except Exception as e:
logger.error(f"Failed to get strings: {e}")
raise
def get_xrefs(self, file_path=None, address=None):
"""Get cross-references to a specific address."""
try:
# We don't have direct access to xrefs in the personal license
# Return a placeholder
return []
except Exception as e:
logger.error(f"Failed to get xrefs: {e}")
raise
def get_imports(self, offset=0, limit=100):
"""Get list of imported functions."""
try:
# Get all imports with pagination
all_imports = []
current_offset = 0
current_limit = limit
while True:
response = self._request('GET', 'imports', params={"offset": current_offset, "limit": current_limit})
imports = response.get("imports", [])
if not imports:
break
all_imports.extend(imports)
# If we got fewer imports than the limit, we've reached the end
if len(imports) < current_limit:
break
# Move to the next page
current_offset += current_limit
logger.info(f"Retrieved {len(all_imports)} imports in total")
return all_imports
except Exception as e:
logger.error(f"Failed to get imports: {e}")
raise
def get_exports(self, offset=0, limit=100):
"""Get list of exported symbols."""
try:
# Get all exports with pagination
all_exports = []
current_offset = 0
current_limit = limit
while True:
response = self._request('GET', 'exports', params={"offset": current_offset, "limit": current_limit})
exports = response.get("exports", [])
if not exports:
break
all_exports.extend(exports)
# If we got fewer exports than the limit, we've reached the end
if len(exports) < current_limit:
break
# Move to the next page
current_offset += current_limit
logger.info(f"Retrieved {len(all_exports)} exports in total")
return all_exports
except Exception as e:
logger.error(f"Failed to get exports: {e}")
raise
def get_namespaces(self, offset=0, limit=100):
"""Get list of C++ namespaces."""
try:
# Get all namespaces with pagination
all_namespaces = []
current_offset = 0
current_limit = limit
while True:
response = self._request('GET', 'namespaces', params={"offset": current_offset, "limit": current_limit})
namespaces = response.get("namespaces", [])
if not namespaces:
break
all_namespaces.extend(namespaces)
# If we got fewer namespaces than the limit, we've reached the end
if len(namespaces) < current_limit:
break
# Move to the next page
current_offset += current_limit
logger.info(f"Retrieved {len(all_namespaces)} namespaces in total")
return all_namespaces
except Exception as e:
logger.error(f"Failed to get namespaces: {e}")
raise
def get_defined_data(self, offset=0, limit=100):
"""Get list of defined data variables."""
try:
# Get all defined data with pagination
all_data = []
current_offset = 0
current_limit = limit
while True:
response = self._request('GET', 'data', params={"offset": current_offset, "limit": current_limit})
data_items = response.get("data", [])
if not data_items:
break
all_data.extend(data_items)
# If we got fewer data items than the limit, we've reached the end
if len(data_items) < current_limit:
break
# Move to the next page
current_offset += current_limit
logger.info(f"Retrieved {len(all_data)} data items in total")
return all_data
except Exception as e:
logger.error(f"Failed to get defined data: {e}")
raise
def search_functions(self, query, offset=0, limit=100):
"""Search functions by name."""
try:
# Get all matching functions with pagination
all_matches = []
current_offset = 0
current_limit = limit
while True:
response = self._request('GET', 'searchFunctions', params={"query": query, "offset": current_offset, "limit": current_limit})
matches = response.get("matches", [])
if not matches:
break
all_matches.extend(matches)
# If we got fewer matches than the limit, we've reached the end
if len(matches) < current_limit:
break
# Move to the next page
current_offset += current_limit
logger.info(f"Retrieved {len(all_matches)} matching functions in total")
return all_matches
except Exception as e:
logger.error(f"Failed to search functions: {e}")
raise
def load_binary(self, file_path):
"""Load a binary file."""
try:
response = self._request('POST', 'load', data={"filepath": file_path})
return response
except Exception as e:
logger.error(f"Failed to load binary: {e}")
raise
def rename_function(self, old_name, new_name):
"""Rename a function."""
try:
response = self._request('POST', 'rename/function', data={"oldName": old_name, "newName": new_name})
return response.get("success", False)
except Exception as e:
logger.error(f"Failed to rename function: {e}")
raise
def rename_data(self, address, new_name):
"""Rename a data variable."""
try:
response = self._request('POST', 'rename/data', data={"address": address, "newName": new_name})
return response.get("success", False)
except Exception as e:
logger.error(f"Failed to rename data: {e}")
raise
# Example usage
if __name__ == "__main__":
import sys
if len(sys.argv) < 2:
print(f"Usage: {sys.argv[0]} <path_to_binary>")
sys.exit(1)
binary_path = sys.argv[1]
client = BinaryNinjaHTTPClient()
# Test the connection
ping_result = client.ping()
print(f"Connection status: {ping_result['status']}")
if ping_result['status'] == 'connected':
print(f"Binary file loaded: {ping_result.get('loaded', False)}")
# Get file info
file_info = client.get_file_info(binary_path)
print(f"\nFile info: {json.dumps(file_info, indent=2)}")
# List functions
functions = client.list_functions(binary_path)
print(f"\nFound {len(functions)} functions")
for i, func in enumerate(functions[:5]): # Show only first 5 functions
print(f"{i+1}. {func['name']} at {hex(func['start'])}")
# Get disassembly of the first function
if functions:
func = functions[0]
disasm = client.get_disassembly(binary_path, function_name=func['name'])
print(f"\nDisassembly of {func['name']}:")
for line in disasm[:10]: # Show only first 10 lines
print(line)