"""
内存高级操作模块
提供内存的高级操作(保护、比较、填充、分配、释放、批量操作等)
"""
from typing import Dict, Any, List, Optional
from ..core.base_controller import BaseController
class MemoryAdvancedModule:
"""内存高级操作模块"""
def __init__(self, base_controller: BaseController):
self.base = base_controller
def set_memory_protection(self, address: str, size: int, protection: str) -> Dict[str, Any]:
"""设置内存保护属性"""
address = address.strip().replace(" ", "")
protection_map = {
"R": "PAGE_READONLY", "READ": "PAGE_READONLY",
"W": "PAGE_READWRITE", "WRITE": "PAGE_READWRITE", "RW": "PAGE_READWRITE",
"X": "PAGE_EXECUTE", "EXECUTE": "PAGE_EXECUTE",
"RX": "PAGE_EXECUTE_READ",
"RWX": "PAGE_EXECUTE_READWRITE",
"NONE": "PAGE_NOACCESS"
}
prot_flag = protection_map.get(protection.upper(), protection)
protect_script = f"""# X64Dbg Memory Protection Script
try:
import dbg
addr = int('{address}', 16) if '{address}'.startswith('0x') else int('{address}')
size = {size}
prot = '{prot_flag}'
if hasattr(dbg, 'setMemoryProtection'):
result = dbg.setMemoryProtection(addr, size, prot)
print(f"MCP_RESULT:{{'status':'success','address':'{address}','size':{size},'protection':'{protection}','result':result}}")
else:
result = dbgcmd(f'VirtualProtect {{addr}}, {{size}}, {{prot}}')
print(f"MCP_RESULT:{{'status':'success','address':'{address}','size':{size},'protection':'{protection}','result':result}}")
except Exception as e:
print(f"MCP_RESULT:{{'status':'error','error':str(e)}}")
"""
return self.base.script_executor.execute_script_auto(protect_script)
def get_memory_protection(self, address: str) -> Dict[str, Any]:
"""获取内存保护属性"""
address = address.strip().replace(" ", "")
protect_script = f"""# X64Dbg Get Memory Protection Script
try:
import dbg
addr = int('{address}', 16) if '{address}'.startswith('0x') else int('{address}')
if hasattr(dbg, 'getMemoryProtection'):
prot = dbg.getMemoryProtection(addr)
print(f"MCP_RESULT:{{'status':'success','address':'{address}','protection':prot}}")
else:
result = dbgcmd(f'VirtualQuery {{addr}}')
print(f"MCP_RESULT:{{'status':'success','address':'{address}','result':result}}")
except Exception as e:
print(f"MCP_RESULT:{{'status':'error','error':str(e)}}")
"""
return self.base.script_executor.execute_script_auto(protect_script)
def compare_memory(self, address1: str, address2: str, size: int) -> Dict[str, Any]:
"""比较两处内存内容"""
address1 = address1.strip().replace(" ", "")
address2 = address2.strip().replace(" ", "")
compare_script = f"""# X64Dbg Memory Compare Script
try:
import dbg
import json
addr1 = int('{address1}', 16) if '{address1}'.startswith('0x') else int('{address1}')
addr2 = int('{address2}', 16) if '{address2}'.startswith('0x') else int('{address2}')
size = {size}
mem1 = dbg.read(addr1, size)
mem2 = dbg.read(addr2, size)
differences = []
for i in range(size):
if mem1[i] != mem2[i]:
diff_dict = dict()
diff_dict['offset'] = i
diff_dict['address1'] = hex(addr1 + i)
diff_dict['address2'] = hex(addr2 + i)
diff_dict['value1'] = hex(mem1[i])
diff_dict['value2'] = hex(mem2[i])
differences.append(diff_dict)
result = dict()
result['identical'] = len(differences) == 0
result['total_bytes'] = size
result['differences_count'] = len(differences)
result['differences'] = differences[:100]
final_result = dict()
final_result['status'] = 'success'
final_result['result'] = result
print("MCP_RESULT:" + json.dumps(final_result))
except Exception as e:
print(f"MCP_RESULT:{{'status':'error','error':str(e)}}")
"""
return self.base.script_executor.execute_script_auto(compare_script)
def fill_memory(self, address: str, size: int, value: int) -> Dict[str, Any]:
"""填充内存"""
address = address.strip().replace(" ", "")
if value < 0 or value > 255:
raise ValueError('填充值必须在0-255之间!')
fill_script = f"""# X64Dbg Memory Fill Script
try:
import dbg
addr = int('{address}', 16) if '{address}'.startswith('0x') else int('{address}')
size = {size}
fill_value = {value}
fill_data = bytes([fill_value] * size)
dbg.write(addr, fill_data)
import json
result_dict = {{'status':'success','address':'{address}','size':{size},'value':{value}}}
print("MCP_RESULT:" + json.dumps(result_dict))
except Exception as e:
print(f"MCP_RESULT:{{'status':'error','error':str(e)}}")
"""
return self.base.script_executor.execute_script_auto(fill_script)
def allocate_memory(self, size: int, protection: str = "RWX") -> Dict[str, Any]:
"""分配内存"""
if size <= 0 or size > 100 * 1024 * 1024:
raise ValueError('内存大小必须在1字节到100MB之间!')
alloc_script = f"""# X64Dbg Allocate Memory Script
try:
import dbg
size = {size}
prot = '{protection}'
if hasattr(dbg, 'malloc'):
addr = dbg.malloc(size)
if addr:
if prot:
dbg.setMemoryProtection(addr, size, prot)
print(f"MCP_RESULT:{{'status':'success','address':hex(addr),'size':{size},'protection':'{protection}'}}")
else:
print(f"MCP_RESULT:{{'status':'error','error':'内存分配失败'}}")
else:
result = dbgcmd(f'alloc {{size}}, {{prot}}')
print(f"MCP_RESULT:{{'status':'success','result':result}}")
except Exception as e:
print(f"MCP_RESULT:{{'status':'error','error':str(e)}}")
"""
return self.base.script_executor.execute_script_auto(alloc_script)
def free_memory(self, address: str) -> Dict[str, Any]:
"""释放内存"""
address = address.strip().replace(" ", "")
free_script = f"""# X64Dbg Free Memory Script
try:
import dbg
addr = int('{address}', 16) if '{address}'.startswith('0x') else int('{address}')
if hasattr(dbg, 'free'):
result = dbg.free(addr)
print(f"MCP_RESULT:{{'status':'success','address':'{address}','result':result}}")
else:
result = dbgcmd(f'free {{addr}}')
print(f"MCP_RESULT:{{'status':'success','address':'{address}','result':result}}")
except Exception as e:
print(f"MCP_RESULT:{{'status':'error','error':str(e)}}")
"""
return self.base.script_executor.execute_script_auto(free_script)
def get_memory_region_info(self, address: str) -> Dict[str, Any]:
"""获取内存区域信息"""
address = address.strip().replace(" ", "")
region_script = f"""# X64Dbg Get Memory Region Info Script
try:
import dbg
addr = int('{address}', 16) if '{address}'.startswith('0x') else int('{address}')
if hasattr(dbg, 'getMemoryRegionInfo'):
info = dbg.getMemoryRegionInfo(addr)
print(f"MCP_RESULT:{{'status':'success','address':'{address}','info':{{info}}}}")
else:
result = dbgcmd(f'VirtualQuery {{addr}}')
print(f"MCP_RESULT:{{'status':'success','address':'{address}','result':result}}")
except Exception as e:
print(f"MCP_RESULT:{{'status':'error','error':str(e)}}")
"""
return self.base.script_executor.execute_script_auto(region_script)
def batch_read_memory(self, addresses: List[str], sizes: Optional[List[int]] = None) -> Dict[str, Any]:
"""批量读取内存"""
if not addresses or len(addresses) == 0:
raise ValueError('地址列表不能为空!')
if len(addresses) > 100:
raise ValueError('一次最多读取100个地址!')
if sizes and len(sizes) != len(addresses):
raise ValueError('大小列表长度必须与地址列表长度相同!')
if not sizes:
sizes = [64] * len(addresses)
# 需要从基础模块读取内存
from .memory_basic import MemoryBasicModule
basic_module = MemoryBasicModule(self.base)
results = {}
for i, addr in enumerate(addresses):
size = sizes[i] if i < len(sizes) else 64
result = basic_module.read_memory(addr, size)
results[addr] = result
return {
"status": "success",
"total": len(addresses),
"results": results
}