Skip to main content
Glama
Evan7198

PCILeech MCP Server

by Evan7198
pcileech_wrapper.py8.8 kB
""" PCILeech wrapper for memory operations. This module provides a Python interface to PCILeech command-line tool for DMA-based memory read/write operations. """ import subprocess import os import json import re from typing import Optional, Tuple from pathlib import Path class PCILeechError(Exception): """Base exception for PCILeech operations.""" pass class PCILeechWrapper: """Wrapper for PCILeech command-line tool.""" def __init__(self, config_path: Optional[str] = None): """Initialize PCILeech wrapper with configuration.""" # Default to config.json in the same directory as this script if config_path is None: script_dir = Path(__file__).parent config_path = script_dir / "config.json" with open(config_path, 'r') as f: config = json.load(f) # Make executable path absolute relative to config file location exe_path = config['pcileech']['executable_path'] if not os.path.isabs(exe_path): config_dir = Path(config_path).parent exe_path = str(config_dir / exe_path) self.executable = exe_path self.timeout = config['pcileech']['timeout_seconds'] # Verify PCILeech executable exists if not os.path.exists(self.executable): raise PCILeechError(f"PCILeech executable not found at: {self.executable}") def _run_command(self, args: list[str]) -> Tuple[str, str, int]: """ Execute PCILeech command and return output. Args: args: Command-line arguments for PCILeech Returns: Tuple of (stdout, stderr, returncode) Raises: PCILeechError: If command execution fails """ try: result = subprocess.run( [self.executable] + args, capture_output=True, text=True, timeout=self.timeout, cwd=os.path.dirname(self.executable) ) return result.stdout, result.stderr, result.returncode except subprocess.TimeoutExpired: raise PCILeechError(f"PCILeech command timed out after {self.timeout} seconds") except Exception as e: raise PCILeechError(f"Failed to execute PCILeech: {str(e)}") def read_memory(self, address: str, length: int) -> bytes: """ Read memory from specified address. Uses PCILeech 'display' command which always returns 256 bytes aligned to 16-byte boundaries. This method handles alignment and extraction of the requested data range. Args: address: Memory address in hex format (e.g., "0x1000" or "1000") length: Number of bytes to read Returns: bytes: Memory content Raises: PCILeechError: If read operation fails """ # Normalize address format (remove 0x prefix if present) addr = address.lower().replace('0x', '') target_addr = int(addr, 16) # Display command returns 256 bytes aligned to 16-byte boundaries # We need to read in 256-byte chunks and extract what we need DISPLAY_SIZE = 256 # Display always returns 256 bytes ALIGN_SIZE = 16 # Display aligns to 16-byte boundaries all_data = bytearray() bytes_remaining = length current_addr = target_addr while bytes_remaining > 0: # Calculate aligned address (16-byte boundary) aligned_addr = (current_addr // ALIGN_SIZE) * ALIGN_SIZE # Read 256-byte chunk using display args = ['display', '-min', f'0x{aligned_addr:x}'] stdout, stderr, returncode = self._run_command(args) if returncode != 0: raise PCILeechError(f"Memory read failed: {stderr}") # Parse the 256-byte chunk chunk_hex = self._parse_display_output(stdout) if not chunk_hex: raise PCILeechError(f"Failed to parse PCILeech output") chunk_data = bytes.fromhex(chunk_hex) # Verify we got 256 bytes if len(chunk_data) != DISPLAY_SIZE: raise PCILeechError(f"Expected {DISPLAY_SIZE} bytes, got {len(chunk_data)}") # Calculate offset within this chunk offset_in_chunk = current_addr - aligned_addr # Calculate how many bytes to extract from this chunk bytes_from_chunk = min(DISPLAY_SIZE - offset_in_chunk, bytes_remaining) # Extract the needed portion extracted = chunk_data[offset_in_chunk : offset_in_chunk + bytes_from_chunk] all_data.extend(extracted) # Update counters bytes_remaining -= bytes_from_chunk current_addr += bytes_from_chunk return bytes(all_data) def write_memory(self, address: str, data: bytes) -> bool: """ Write data to memory at specified address. Args: address: Memory address in hex format data: Data to write Returns: bool: True if write succeeded Raises: PCILeechError: If write operation fails """ # Normalize address addr = address.lower().replace('0x', '') # Convert data to hex string hex_data = data.hex() # PCILeech command: write -min <address> -in <hex_data> args = [ 'write', '-min', f'0x{addr}', '-in', hex_data, ] stdout, stderr, returncode = self._run_command(args) if returncode != 0: raise PCILeechError(f"Memory write failed: {stderr}") return True def _parse_display_output(self, output: str) -> str: """ Parse PCILeech display output to extract hex data. Format example: 0000 e9 4d 06 00 01 00 00 00 01 00 00 00 3f 00 18 10 .M..........?... ^offset ^8 hex bytes ^8 hex bytes ^ASCII The format is fixed: - 4 char hex offset - 4 spaces - 47 chars of hex data (16 bytes as "xx xx xx xx xx xx xx xx xx xx xx xx xx xx xx xx") - 3 spaces - 16 chars ASCII representation Args: output: Raw PCILeech display output Returns: str: Concatenated hex data (without spaces) """ hex_data = [] for line in output.splitlines(): # Skip headers and empty lines if not line or 'Memory Display:' in line or 'Contents for address:' in line: continue line = line.rstrip() # Only strip trailing whitespace # Match lines starting with 4-digit hex offset if re.match(r'^[0-9a-fA-F]{4}\s+', line): # More robust method: extract all hex byte pairs using regex # This finds all 2-character hex sequences that are word-bounded # Skip the first match which is the offset # First, try to extract hex data from fixed positions # Format: "0060 xx xx xx xx xx xx xx xx xx xx xx xx xx xx xx xx ASCII" # Position: 0-3=offset, 4-7=spaces, 8-54=hex data (47 chars), 55-57=spaces, 58+=ASCII if len(line) >= 56: # Extract the hex portion (positions 8-55, which is 48 chars) # Format: "xx xx xx xx xx xx xx xx xx xx xx xx xx xx xx xx" # = 8*3-1 + 2 + 8*3-1 = 23 + 2 + 23 = 48 chars hex_portion = line[8:56] else: # Fallback for shorter lines hex_portion = line[4:].lstrip() # Try to find where ASCII starts (after 3+ spaces following hex data) ascii_match = re.search(r'\s{3,}[^\s]', hex_portion) if ascii_match: hex_portion = hex_portion[:ascii_match.start()] # Extract all hex byte pairs (2 consecutive hex chars) hex_bytes = re.findall(r'[0-9a-fA-F]{2}', hex_portion) # Validate we got reasonable number of bytes (should be 16 per line) if hex_bytes: hex_data.append(''.join(hex_bytes)) return ''.join(hex_data) def verify_connection(self) -> bool: """ Verify PCILeech is working and hardware is connected. Returns: bool: True if connection is valid """ try: # Use 'info' command which works for all device types stdout, stderr, returncode = self._run_command(['info']) return returncode == 0 except Exception: return False

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Evan7198/mcp_server_pcileech'

If you have feedback or need assistance with the MCP directory API, please join our Discord server