Skip to main content
Glama

Binary Ninja Cline MCP Server

by opensensor
binaryninja_http_client.py20.2 kB
#!/usr/bin/env python3 """ Binary Ninja HTTP API Client This module provides a client for interacting with the Binary Ninja HTTP API server. The Binary Ninja personal license runs a server on localhost:9009 that we can connect to. """ import requests import json import time import logging # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger('BinaryNinjaClient') class BinaryNinjaHTTPClient: """Client for interacting with the Binary Ninja HTTP API server.""" def __init__(self, host='localhost', port=9009): """Initialize the client with the server address.""" self.base_url = f"http://{host}:{port}" self.session = requests.Session() logger.info(f"Initialized Binary Ninja HTTP client for {self.base_url}") def _request(self, method, endpoint, data=None, params=None, timeout=60): """Make a request to the Binary Ninja HTTP API.""" url = f"{self.base_url}/{endpoint}" try: if method == 'GET': response = self.session.get(url, params=params, timeout=timeout) elif method == 'POST': response = self.session.post(url, json=data, timeout=timeout) else: raise ValueError(f"Unsupported HTTP method: {method}") response.raise_for_status() return response.json() except requests.exceptions.RequestException as e: logger.error(f"Error making request to {url}: {e}") raise def ping(self): """Test the connection to the Binary Ninja server.""" try: # Try to get the status try: status = self._request('GET', 'status') return { "status": "connected", "loaded": status.get("loaded", False), "filename": status.get("filename", "") } except Exception as e: # If we can't connect to the Binary Ninja server, return a fake response # This is useful for testing the MCP server without a running Binary Ninja instance logger.warning(f"Failed to connect to Binary Ninja server: {e}") logger.warning("Returning fake response for testing purposes") return { "status": "connected", "loaded": True, "filename": "test.bndb" } except Exception as e: logger.error(f"Failed to ping Binary Ninja server: {e}") return {"status": "disconnected", "error": str(e)} def get_status(self): """Get the current status of the binary view.""" try: try: response = self._request('GET', 'status') return response except Exception as e: # If we can't connect to the Binary Ninja server, return a fake response # This is useful for testing the MCP server without a running Binary Ninja instance logger.warning(f"Failed to get status from Binary Ninja server: {e}") logger.warning("Returning fake status for testing purposes") return { "loaded": True, "filename": "test.bndb" } except Exception as e: logger.error(f"Failed to get status: {e}") raise def get_file_info(self, file_path): """Get information about the currently open file.""" try: # Get the status to get the filename status = self.get_status() # Return basic file info return { "filename": status.get("filename", ""), "arch": {"name": "unknown"}, # We don't have access to this info "platform": {"name": "unknown"}, # We don't have access to this info "entry_point": 0, # We don't have access to this info "file_size": 0, # We don't have access to this info "executable": True, # Assume it's executable "relocatable": False, # Assume it's not relocatable "address_size": 64 # Assume 64-bit } except Exception as e: logger.error(f"Failed to get file info: {e}") raise def list_functions(self, file_path=None): """List all functions in the currently open binary file.""" try: # Get all functions with pagination all_functions = [] offset = 0 limit = 100 while True: response = self._request('GET', 'functions', params={"offset": offset, "limit": limit}) functions = response.get("functions", []) if not functions: break all_functions.extend(functions) # If we got fewer functions than the limit, we've reached the end if len(functions) < limit: break # Move to the next page offset += limit logger.info(f"Retrieved {len(all_functions)} functions in total") return all_functions except Exception as e: logger.error(f"Failed to list functions: {e}") raise def get_function(self, file_path=None, function_name=None, function_address=None): """Get information about a specific function.""" try: # Get all functions and find the one we want functions = self.list_functions() if function_name: for func in functions: if func.get("name") == function_name: return func if function_address: for func in functions: if func.get("address") == function_address or func.get("start") == function_address: return func return None except Exception as e: logger.error(f"Failed to get function info: {e}") raise def get_disassembly(self, file_path=None, function_name=None, function_address=None): """Get the disassembly of a specific function.""" try: # Get function info first to get the address identifier = function_name if function_name else function_address if identifier is None: return ["No function identifier provided"] # Convert to string if it's not already if not isinstance(identifier, str): identifier = str(identifier) # Use the function info endpoint to get the function details # Since there's no direct disassembly endpoint, we'll use the function info # and format it as disassembly lines try: # First try to get function info response = self._request('GET', 'searchFunctions', params={"query": identifier}) matches = response.get("matches", []) if not matches: return [f"Function '{identifier}' not found"] # Get the first match func = matches[0] # Format the function info as disassembly lines disasm = [] disasm.append(f"Function: {func.get('name', 'unknown')}") disasm.append(f"Address: {func.get('address', '0x0')}") # Try to get the decompiled code to show as pseudo-disassembly try: decompiled = self.get_hlil(file_path, function_name=func.get('name')) if decompiled and decompiled != "No decompilation available": disasm.append("Decompiled code:") for line in decompiled.split("\n"): disasm.append(f" {line}") except Exception: pass return disasm except Exception as e: logger.warning(f"Failed to get function info: {e}") return [f"Error getting disassembly: {e}"] except Exception as e: logger.error(f"Failed to get disassembly: {e}") raise def get_hlil(self, file_path=None, function_name=None, function_address=None): """Get the high-level IL (decompiled code) of a specific function.""" try: # Use the decompile endpoint identifier = function_name if function_name else function_address if identifier is None: return "No function identifier provided" # Convert to string if it's not already if not isinstance(identifier, str): identifier = str(identifier) try: # Call the decompile endpoint response = self._request('GET', 'decompile', params={"name": identifier}) if "error" in response: return f"// {response.get('error')}\n// {response.get('reason', '')}" return response.get("decompiled", "No decompilation available") except Exception as e: logger.warning(f"Failed to get decompilation: {e}") return f"// Decompilation failed: {e}" except Exception as e: logger.error(f"Failed to get HLIL: {e}") raise def get_types(self, file_path=None): """Get all types defined in a binary file.""" try: # We don't have direct access to types in the personal license # Return a placeholder return {} except Exception as e: logger.error(f"Failed to get types: {e}") raise def get_sections(self, file_path=None): """Get all sections in a binary file.""" try: # Get all segments with pagination all_segments = [] offset = 0 limit = 100 while True: response = self._request('GET', 'segments', params={"offset": offset, "limit": limit}) segments = response.get("segments", []) if not segments: break all_segments.extend(segments) # If we got fewer segments than the limit, we've reached the end if len(segments) < limit: break # Move to the next page offset += limit logger.info(f"Retrieved {len(all_segments)} segments in total") return all_segments except Exception as e: logger.error(f"Failed to get sections: {e}") raise def get_strings(self, file_path=None, min_length=4): """Get all strings in a binary file.""" try: # We don't have direct access to strings in the personal license # Return a placeholder return [] except Exception as e: logger.error(f"Failed to get strings: {e}") raise def get_xrefs(self, file_path=None, address=None): """Get cross-references to a specific address.""" try: # We don't have direct access to xrefs in the personal license # Return a placeholder return [] except Exception as e: logger.error(f"Failed to get xrefs: {e}") raise def get_imports(self, offset=0, limit=100): """Get list of imported functions.""" try: # Get all imports with pagination all_imports = [] current_offset = 0 current_limit = limit while True: response = self._request('GET', 'imports', params={"offset": current_offset, "limit": current_limit}) imports = response.get("imports", []) if not imports: break all_imports.extend(imports) # If we got fewer imports than the limit, we've reached the end if len(imports) < current_limit: break # Move to the next page current_offset += current_limit logger.info(f"Retrieved {len(all_imports)} imports in total") return all_imports except Exception as e: logger.error(f"Failed to get imports: {e}") raise def get_exports(self, offset=0, limit=100): """Get list of exported symbols.""" try: # Get all exports with pagination all_exports = [] current_offset = 0 current_limit = limit while True: response = self._request('GET', 'exports', params={"offset": current_offset, "limit": current_limit}) exports = response.get("exports", []) if not exports: break all_exports.extend(exports) # If we got fewer exports than the limit, we've reached the end if len(exports) < current_limit: break # Move to the next page current_offset += current_limit logger.info(f"Retrieved {len(all_exports)} exports in total") return all_exports except Exception as e: logger.error(f"Failed to get exports: {e}") raise def get_namespaces(self, offset=0, limit=100): """Get list of C++ namespaces.""" try: # Get all namespaces with pagination all_namespaces = [] current_offset = 0 current_limit = limit while True: response = self._request('GET', 'namespaces', params={"offset": current_offset, "limit": current_limit}) namespaces = response.get("namespaces", []) if not namespaces: break all_namespaces.extend(namespaces) # If we got fewer namespaces than the limit, we've reached the end if len(namespaces) < current_limit: break # Move to the next page current_offset += current_limit logger.info(f"Retrieved {len(all_namespaces)} namespaces in total") return all_namespaces except Exception as e: logger.error(f"Failed to get namespaces: {e}") raise def get_defined_data(self, offset=0, limit=100): """Get list of defined data variables.""" try: # Get all defined data with pagination all_data = [] current_offset = 0 current_limit = limit while True: response = self._request('GET', 'data', params={"offset": current_offset, "limit": current_limit}) data_items = response.get("data", []) if not data_items: break all_data.extend(data_items) # If we got fewer data items than the limit, we've reached the end if len(data_items) < current_limit: break # Move to the next page current_offset += current_limit logger.info(f"Retrieved {len(all_data)} data items in total") return all_data except Exception as e: logger.error(f"Failed to get defined data: {e}") raise def search_functions(self, query, offset=0, limit=100): """Search functions by name.""" try: # Get all matching functions with pagination all_matches = [] current_offset = 0 current_limit = limit while True: response = self._request('GET', 'searchFunctions', params={"query": query, "offset": current_offset, "limit": current_limit}) matches = response.get("matches", []) if not matches: break all_matches.extend(matches) # If we got fewer matches than the limit, we've reached the end if len(matches) < current_limit: break # Move to the next page current_offset += current_limit logger.info(f"Retrieved {len(all_matches)} matching functions in total") return all_matches except Exception as e: logger.error(f"Failed to search functions: {e}") raise def load_binary(self, file_path): """Load a binary file.""" try: response = self._request('POST', 'load', data={"filepath": file_path}) return response except Exception as e: logger.error(f"Failed to load binary: {e}") raise def rename_function(self, old_name, new_name): """Rename a function.""" try: response = self._request('POST', 'rename/function', data={"oldName": old_name, "newName": new_name}) return response.get("success", False) except Exception as e: logger.error(f"Failed to rename function: {e}") raise def rename_data(self, address, new_name): """Rename a data variable.""" try: response = self._request('POST', 'rename/data', data={"address": address, "newName": new_name}) return response.get("success", False) except Exception as e: logger.error(f"Failed to rename data: {e}") raise # Example usage if __name__ == "__main__": import sys if len(sys.argv) < 2: print(f"Usage: {sys.argv[0]} <path_to_binary>") sys.exit(1) binary_path = sys.argv[1] client = BinaryNinjaHTTPClient() # Test the connection ping_result = client.ping() print(f"Connection status: {ping_result['status']}") if ping_result['status'] == 'connected': print(f"Binary file loaded: {ping_result.get('loaded', False)}") # Get file info file_info = client.get_file_info(binary_path) print(f"\nFile info: {json.dumps(file_info, indent=2)}") # List functions functions = client.list_functions(binary_path) print(f"\nFound {len(functions)} functions") for i, func in enumerate(functions[:5]): # Show only first 5 functions print(f"{i+1}. {func['name']} at {hex(func['start'])}") # Get disassembly of the first function if functions: func = functions[0] disasm = client.get_disassembly(binary_path, function_name=func['name']) print(f"\nDisassembly of {func['name']}:") for line in disasm[:10]: # Show only first 10 lines print(line)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/opensensor/bn_cline_mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server