Binary Reader MCP

by berlinbra
Verified
from typing import Any import asyncio from mcp.server.models import InitializationOptions import mcp.types as types from mcp.server import NotificationOptions, Server import mcp.server.stdio import os from src.binary_reader.unreal_reader import UnrealAssetReader from src.binary_reader.base_reader import BinaryReader server = Server("binary_reader") @server.list_tools() async def handle_list_tools() -> list[types.Tool]: """ List available tools for binary file analysis. Each tool specifies its arguments using JSON Schema validation. """ return [ types.Tool( name="read-unreal-asset", description="Read and analyze Unreal Engine .uasset file structure", inputSchema={ "type": "object", "properties": { "file_path": { "type": "string", "description": "Path to the .uasset file", }, }, "required": ["file_path"], }, ), types.Tool( name="read-binary-metadata", description="Read generic binary file metadata and structure", inputSchema={ "type": "object", "properties": { "file_path": { "type": "string", "description": "Path to the binary file", }, "format": { "type": "string", "description": "File format specification (if known)", "enum": ["auto", "unreal", "custom"], "default": "auto" } }, "required": ["file_path"], }, ) ] def format_unreal_asset_data(data: dict) -> str: """Format Unreal asset data into a readable string.""" try: return ( f"Unreal Asset Analysis:\n\n" f"Header Information:\n" f"Magic: {data.get('header', {}).get('magic', 'N/A')}\n" f"Legacy Version: {data.get('header', {}).get('legacy_version', 'N/A')}\n" f"UE4 Version: {data.get('header', {}).get('file_version_ue4', 'N/A')}\n" f"File Size: {data.get('header', {}).get('file_size', 'N/A')} bytes\n\n" f"Metadata:\n" f"Flags: {data.get('metadata', {}).get('flags', 'N/A')}\n" f"Element Count: {data.get('metadata', {}).get('element_count', 'N/A')}\n" f"Bulk Data Size: {data.get('metadata', {}).get('bulk_data_size', 'N/A')} bytes\n" "---" ) except Exception as e: return f"Error formatting asset data: {str(e)}" @server.call_tool() async def handle_call_tool( name: str, arguments: dict | None ) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]: """ Handle tool execution requests. Tools can read and analyze binary files. """ if not arguments: return [types.TextContent(type="text", text="Missing arguments for the request")] if name == "read-unreal-asset": file_path = arguments.get("file_path") if not file_path: return [types.TextContent(type="text", text="Missing file_path parameter")] try: with UnrealAssetReader(file_path) as reader: data = { "header": reader.read_header(), "metadata": reader.read_metadata(), } formatted_data = format_unreal_asset_data(data) return [types.TextContent(type="text", text=formatted_data)] except FileNotFoundError: return [types.TextContent(type="text", text=f"Error: File not found - {file_path}")] except ValueError as e: return [types.TextContent(type="text", text=f"Error: {str(e)}")] except Exception as e: return [types.TextContent(type="text", text=f"Unexpected error: {str(e)}")] elif name == "read-binary-metadata": file_path = arguments.get("file_path") if not file_path: return [types.TextContent(type="text", text="Missing file_path parameter")] format_type = arguments.get("format", "auto") try: # Auto-detect format or use specified format if format_type == "unreal" or (format_type == "auto" and file_path.endswith(".uasset")): reader_class = UnrealAssetReader else: reader_class = BinaryReader with reader_class(file_path) as reader: metadata = { "file_size": os.path.getsize(file_path), "header": reader.read_header() if hasattr(reader, "read_header") else None, "metadata": reader.read_metadata() if hasattr(reader, "read_metadata") else None } formatted_text = ( f"Binary File Analysis:\n\n" f"File Path: {file_path}\n" f"File Size: {metadata['file_size']} bytes\n" f"Format: {format_type}\n\n" ) if metadata["header"]: formatted_text += f"Header Information:\n{metadata['header']}\n\n" if metadata["metadata"]: formatted_text += f"Metadata Information:\n{metadata['metadata']}" return [types.TextContent(type="text", text=formatted_text)] except FileNotFoundError: return [types.TextContent(type="text", text=f"Error: File not found - {file_path}")] except Exception as e: return [types.TextContent(type="text", text=f"Error analyzing file: {str(e)}")] else: return [types.TextContent(type="text", text=f"Unknown tool: {name}")] async def main(): async with mcp.server.stdio.stdio_server() as (read_stream, write_stream): await server.run( read_stream, write_stream, InitializationOptions( server_name="binary_reader", server_version="0.1.0", capabilities=server.get_capabilities( notification_options=NotificationOptions(), experimental_capabilities={}, ), ), ) if __name__ == "__main__": asyncio.run(main())