Binary Reader MCP
by berlinbra
Verified
from typing import Any
import asyncio
from mcp.server.models import InitializationOptions
import mcp.types as types
from mcp.server import NotificationOptions, Server
import mcp.server.stdio
import os
from src.binary_reader.unreal_reader import UnrealAssetReader
from src.binary_reader.base_reader import BinaryReader
server = Server("binary_reader")
@server.list_tools()
async def handle_list_tools() -> list[types.Tool]:
"""
List available tools for binary file analysis.
Each tool specifies its arguments using JSON Schema validation.
"""
return [
types.Tool(
name="read-unreal-asset",
description="Read and analyze Unreal Engine .uasset file structure",
inputSchema={
"type": "object",
"properties": {
"file_path": {
"type": "string",
"description": "Path to the .uasset file",
},
},
"required": ["file_path"],
},
),
types.Tool(
name="read-binary-metadata",
description="Read generic binary file metadata and structure",
inputSchema={
"type": "object",
"properties": {
"file_path": {
"type": "string",
"description": "Path to the binary file",
},
"format": {
"type": "string",
"description": "File format specification (if known)",
"enum": ["auto", "unreal", "custom"],
"default": "auto"
}
},
"required": ["file_path"],
},
)
]
def format_unreal_asset_data(data: dict) -> str:
"""Format Unreal asset data into a readable string."""
try:
return (
f"Unreal Asset Analysis:\n\n"
f"Header Information:\n"
f"Magic: {data.get('header', {}).get('magic', 'N/A')}\n"
f"Legacy Version: {data.get('header', {}).get('legacy_version', 'N/A')}\n"
f"UE4 Version: {data.get('header', {}).get('file_version_ue4', 'N/A')}\n"
f"File Size: {data.get('header', {}).get('file_size', 'N/A')} bytes\n\n"
f"Metadata:\n"
f"Flags: {data.get('metadata', {}).get('flags', 'N/A')}\n"
f"Element Count: {data.get('metadata', {}).get('element_count', 'N/A')}\n"
f"Bulk Data Size: {data.get('metadata', {}).get('bulk_data_size', 'N/A')} bytes\n"
"---"
)
except Exception as e:
return f"Error formatting asset data: {str(e)}"
@server.call_tool()
async def handle_call_tool(
name: str, arguments: dict | None
) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]:
"""
Handle tool execution requests.
Tools can read and analyze binary files.
"""
if not arguments:
return [types.TextContent(type="text", text="Missing arguments for the request")]
if name == "read-unreal-asset":
file_path = arguments.get("file_path")
if not file_path:
return [types.TextContent(type="text", text="Missing file_path parameter")]
try:
with UnrealAssetReader(file_path) as reader:
data = {
"header": reader.read_header(),
"metadata": reader.read_metadata(),
}
formatted_data = format_unreal_asset_data(data)
return [types.TextContent(type="text", text=formatted_data)]
except FileNotFoundError:
return [types.TextContent(type="text", text=f"Error: File not found - {file_path}")]
except ValueError as e:
return [types.TextContent(type="text", text=f"Error: {str(e)}")]
except Exception as e:
return [types.TextContent(type="text", text=f"Unexpected error: {str(e)}")]
elif name == "read-binary-metadata":
file_path = arguments.get("file_path")
if not file_path:
return [types.TextContent(type="text", text="Missing file_path parameter")]
format_type = arguments.get("format", "auto")
try:
# Auto-detect format or use specified format
if format_type == "unreal" or (format_type == "auto" and file_path.endswith(".uasset")):
reader_class = UnrealAssetReader
else:
reader_class = BinaryReader
with reader_class(file_path) as reader:
metadata = {
"file_size": os.path.getsize(file_path),
"header": reader.read_header() if hasattr(reader, "read_header") else None,
"metadata": reader.read_metadata() if hasattr(reader, "read_metadata") else None
}
formatted_text = (
f"Binary File Analysis:\n\n"
f"File Path: {file_path}\n"
f"File Size: {metadata['file_size']} bytes\n"
f"Format: {format_type}\n\n"
)
if metadata["header"]:
formatted_text += f"Header Information:\n{metadata['header']}\n\n"
if metadata["metadata"]:
formatted_text += f"Metadata Information:\n{metadata['metadata']}"
return [types.TextContent(type="text", text=formatted_text)]
except FileNotFoundError:
return [types.TextContent(type="text", text=f"Error: File not found - {file_path}")]
except Exception as e:
return [types.TextContent(type="text", text=f"Error analyzing file: {str(e)}")]
else:
return [types.TextContent(type="text", text=f"Unknown tool: {name}")]
async def main():
async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
await server.run(
read_stream,
write_stream,
InitializationOptions(
server_name="binary_reader",
server_version="0.1.0",
capabilities=server.get_capabilities(
notification_options=NotificationOptions(),
experimental_capabilities={},
),
),
)
if __name__ == "__main__":
asyncio.run(main())