mcp_msi_server.py•13.4 kB
from fastmcp import FastMCP
from typing import Optional, Dict, Any, List
import subprocess
import json
import os
import shutil
mcp = FastMCP(name="msi-metadata-server")
SCRIPT_PATH = os.path.join(os.path.dirname(__file__), 'scripts', 'ReadMsiSimple.ps1')
def _run_powershell_simple(msi_path: str):
if not os.path.exists(msi_path):
raise FileNotFoundError(f"MSI path not found: {msi_path}")
if not os.path.exists(SCRIPT_PATH):
raise FileNotFoundError(f"Reader script missing: {SCRIPT_PATH}")
if os.name != 'nt':
raise RuntimeError('This MSI reader currently supports Windows only.')
args = [
'powershell', '-NoProfile', '-ExecutionPolicy', 'Bypass',
'-File', SCRIPT_PATH,
msi_path,
]
completed = subprocess.run(args, capture_output=True, text=True, check=False)
if completed.returncode != 0:
stderr = (completed.stderr or '').strip()
stdout = (completed.stdout or '').strip()
raise RuntimeError(f"PowerShell error ({completed.returncode}): {stderr or stdout}")
stdout = (completed.stdout or '').strip()
if not stdout:
raise RuntimeError('No output from PowerShell reader')
return json.loads(stdout)
# Existing core tools
@mcp.tool
def read_msi_metadata(msi_path: str) -> Dict[str, Optional[str]]:
"""Read MSI metadata and return ProductCode, ProductName, ProductVersion, UpgradeCode, Manufacturer."""
meta = _run_powershell(msi_path, 'core')
return {
'ProductCode': meta.get('ProductCode'),
'ProductName': meta.get('ProductName'),
'ProductVersion': meta.get('ProductVersion'),
'UpgradeCode': meta.get('UpgradeCode'),
'Manufacturer': meta.get('Manufacturer'),
}
@mcp.tool
def suggest_silent_commands(msi_path: str) -> Dict[str, str]:
"""Return silent install and uninstall commands based on MSI metadata."""
meta = _run_powershell(msi_path, 'core')
product_code = meta.get('ProductCode')
if not product_code:
raise RuntimeError('Failed to read ProductCode from MSI')
return {
'install': f'msiexec /i "{msi_path}" /qn /norestart',
'uninstall': f'msiexec /x {product_code} /qn /norestart',
}
# Convenience getters
@mcp.tool
def get_product_code(msi_path: str) -> str:
meta = _run_powershell(msi_path, 'core')
pc = meta.get('ProductCode')
if not pc:
raise RuntimeError('Failed to read ProductCode from MSI')
return pc
@mcp.tool
def get_product_name(msi_path: str) -> str:
meta = _run_powershell(msi_path, 'core')
name = meta.get('ProductName')
if not name:
raise RuntimeError('Failed to read ProductName from MSI')
return name
@mcp.tool
def get_product_version(msi_path: str) -> str:
meta = _run_powershell(msi_path, 'core')
ver = meta.get('ProductVersion')
if not ver:
raise RuntimeError('Failed to read ProductVersion from MSI')
return ver
# Build msiexec commands
@mcp.tool
def build_install_command(msi_path: str, log_path: Optional[str] = None, extra_args: Optional[str] = None) -> str:
parts = ["msiexec", "/i", f'"{msi_path}"', "/qn", "/norestart"]
if log_path:
parts.extend(["/L*v", log_path])
if extra_args:
parts.append(extra_args)
return " ".join(parts)
@mcp.tool
def build_uninstall_command(product_code: str, log_path: Optional[str] = None, extra_args: Optional[str] = None) -> str:
parts = ["msiexec", "/x", product_code, "/qn", "/norestart"]
if log_path:
parts.extend(["/L*v", log_path])
if extra_args:
parts.append(extra_args)
return " ".join(parts)
@mcp.tool
def run_msiexec(command: str, confirm: bool = False) -> Dict[str, Any]:
"""Execute an msiexec command. Set confirm=True to proceed."""
if not confirm:
raise RuntimeError('Refusing to execute without confirm=True')
completed = subprocess.run(command, shell=True, capture_output=True, text=True)
return {
'returncode': completed.returncode,
'stdout': completed.stdout,
'stderr': completed.stderr,
}
# New tools exposing additional MSI data
@mcp.tool
def list_properties(msi_path: str) -> Dict[str, Optional[str]]:
"""Return the full Property table as a dict."""
return _run_powershell(msi_path, 'properties')
@mcp.tool
def list_features(msi_path: str) -> List[Dict[str, Optional[str]]]:
"""Return rows from the Feature table."""
return _run_powershell(msi_path, 'features')
@mcp.tool
def list_components(msi_path: str) -> List[Dict[str, Optional[str]]]:
"""Return rows from the Component table."""
return _run_powershell(msi_path, 'components')
@mcp.tool
def list_files(msi_path: str) -> List[Dict[str, Optional[str]]]:
"""Return rows from the File table."""
return _run_powershell(msi_path, 'files')
@mcp.tool
def list_custom_actions(msi_path: str) -> List[Dict[str, Optional[str]]]:
"""Return rows from the CustomAction table."""
return _run_powershell(msi_path, 'custom_actions')
@mcp.tool
def list_shortcuts(msi_path: str) -> List[Dict[str, Optional[str]]]:
"""Return rows from the Shortcut table (Shortcut, Directory, Name, Target)."""
return _run_powershell(msi_path, 'shortcuts')
@mcp.tool
def read_summary_info(msi_path: str) -> Dict[str, Optional[str]]:
"""Return MSI Summary Information stream fields."""
return _run_powershell(msi_path, 'summary')
@mcp.tool
def list_tables(msi_path: str) -> List[str]:
"""Return the list of table names present in the MSI."""
return _run_powershell(msi_path, 'tables')
@mcp.tool
def read_table(msi_path: str, table: str) -> List[Dict[str, Optional[str]]]:
"""Return all rows for the given table name."""
return _run_powershell(msi_path, 'table_rows', table=table)
# Registry tools (no Win32_Product)
REG_SCRIPT_PATH = os.path.join(os.path.dirname(__file__), 'scripts', 'QueryInstalledApps.ps1')
def _run_registry(action: str, name: Optional[str] = None, product_code: Optional[str] = None):
if not os.path.exists(REG_SCRIPT_PATH):
raise FileNotFoundError(f"Registry script missing: {REG_SCRIPT_PATH}")
args = ['powershell', '-NoProfile', '-ExecutionPolicy', 'Bypass', '-File', REG_SCRIPT_PATH, '-Action', action]
if name:
args += ['-Name', name]
if product_code:
args += ['-ProductCode', product_code]
completed = subprocess.run(args, capture_output=True, text=True, check=False)
if completed.returncode != 0:
stderr = (completed.stderr or '').strip()
stdout = (completed.stdout or '').strip()
raise RuntimeError(f"PowerShell error ({completed.returncode}): {stderr or stdout}")
stdout = (completed.stdout or '').strip()
if not stdout:
raise RuntimeError('No output from registry query')
return json.loads(stdout)
@mcp.tool
def registry_list_installed() -> list:
"""List installed applications via registry (HKLM/HKCU, 32/64-bit)."""
return _run_registry('list')
@mcp.tool
def registry_find_by_name(name: str) -> list:
"""Find installed applications by partial name (case-insensitive)."""
return _run_registry('find', name=name)
@mcp.tool
def registry_find_by_product_code(product_code: str) -> list:
"""Find installed applications by exact ProductCode GUID."""
return _run_registry('by_product_code', product_code=product_code)
@mcp.tool
def registry_get_uninstall_string(name: str) -> List[Dict[str, Optional[str]]]:
"""Return uninstall information (UninstallString/QuietUninstallString/ProductCode) for apps matching name."""
return _run_registry('find', name=name)
# Optional: extract files using lessmsi if available
@mcp.tool
def extract_msi_files(msi_path: str, output_dir: str) -> Dict[str, Any]:
"""Extract files from MSI using lessmsi (must be installed in PATH)."""
lessmsi = shutil.which('lessmsi') or shutil.which('lessmsi.exe')
if not lessmsi:
raise RuntimeError('lessmsi not found in PATH. Install lessmsi to enable extraction.')
os.makedirs(output_dir, exist_ok=True)
completed = subprocess.run([lessmsi, 'x', msi_path, output_dir], capture_output=True, text=True)
return {
'returncode': completed.returncode,
'stdout': completed.stdout,
'stderr': completed.stderr,
}
# Local testing helpers (callable functions mirroring the tools)
def local_read_msi_metadata(msi_path: str) -> Dict[str, Optional[str]]:
meta = _run_powershell_simple(msi_path)
return {
'ProductCode': meta.get('ProductCode'),
'ProductName': meta.get('ProductName'),
'ProductVersion': meta.get('ProductVersion'),
'UpgradeCode': meta.get('UpgradeCode'),
'Manufacturer': meta.get('Manufacturer'),
}
def local_get_product_code(msi_path: str) -> str:
pc = _run_powershell_simple(msi_path).get('ProductCode')
if not pc:
raise RuntimeError('Failed to read ProductCode from MSI')
return pc
def local_get_product_name(msi_path: str) -> str:
name = _run_powershell_simple(msi_path).get('ProductName')
if not name:
raise RuntimeError('Failed to read ProductName from MSI')
return name
def local_get_product_version(msi_path: str) -> str:
ver = _run_powershell_simple(msi_path).get('ProductVersion')
if not ver:
raise RuntimeError('Failed to read ProductVersion from MSI')
return ver
def local_suggest_silent_commands(msi_path: str) -> Dict[str, str]:
pc = local_get_product_code(msi_path)
return {
'install': f'msiexec /i "{msi_path}" /qn /norestart',
'uninstall': f'msiexec /x {pc} /qn /norestart',
}
def local_build_install_command(msi_path: str, log_path: Optional[str] = None, extra_args: Optional[str] = None) -> str:
parts = ["msiexec", "/i", f'"{msi_path}"', "/qn", "/norestart"]
if log_path:
parts.extend(["/L*v", log_path])
if extra_args:
parts.append(extra_args)
return " ".join(parts)
def local_build_uninstall_command(product_code: str, log_path: Optional[str] = None, extra_args: Optional[str] = None) -> str:
parts = ["msiexec", "/x", product_code, "/qn", "/norestart"]
if log_path:
parts.extend(["/L*v", log_path])
if extra_args:
parts.append(extra_args)
return " ".join(parts)
def local_list_properties(msi_path: str) -> Dict[str, Optional[str]]:
return _run_powershell(msi_path, 'properties')
def local_list_features(msi_path: str) -> List[Dict[str, Optional[str]]]:
return _run_powershell(msi_path, 'features')
def local_list_components(msi_path: str) -> List[Dict[str, Optional[str]]]:
return _run_powershell(msi_path, 'components')
def local_list_files(msi_path: str) -> List[Dict[str, Optional[str]]]:
return _run_powershell(msi_path, 'files')
def local_list_custom_actions(msi_path: str) -> List[Dict[str, Optional[str]]]:
return _run_powershell(msi_path, 'custom_actions')
def local_list_shortcuts(msi_path: str) -> List[Dict[str, Optional[str]]]:
return _run_powershell(msi_path, 'shortcuts')
def local_read_summary_info(msi_path: str) -> Dict[str, Optional[str]]:
return _run_powershell(msi_path, 'summary')
def _run_powershell_advanced(msi_path: str, action: str):
"""Run advanced PowerShell script for complex MSI operations."""
script_path = os.path.join(os.path.dirname(__file__), 'scripts', 'ReadMsiAdvanced.ps1')
if not os.path.exists(script_path):
raise FileNotFoundError(f"Advanced script missing: {script_path}")
args = [
'powershell', '-NoProfile', '-ExecutionPolicy', 'Bypass',
'-File', script_path,
msi_path,
'-Action', action,
]
completed = subprocess.run(args, capture_output=True, text=True, check=False)
if completed.returncode != 0:
stderr = (completed.stderr or '').strip()
stdout = (completed.stdout or '').strip()
raise RuntimeError(f"PowerShell error ({completed.returncode}): {stderr or stdout}")
stdout = (completed.stdout or '').strip()
if not stdout:
raise RuntimeError('No output from PowerShell reader')
return json.loads(stdout)
def local_list_tables(msi_path: str) -> List[str]:
return _run_powershell_advanced(msi_path, 'tables')
def local_read_table(msi_path: str, table: str) -> List[Dict[str, Optional[str]]]:
# For now, return empty list - table reading needs more complex implementation
return []
def local_list_features(msi_path: str) -> List[Dict[str, Optional[str]]]:
return _run_powershell_advanced(msi_path, 'features')
def local_list_components(msi_path: str) -> List[Dict[str, Optional[str]]]:
return _run_powershell_advanced(msi_path, 'components')
def local_list_files(msi_path: str) -> List[Dict[str, Optional[str]]]:
return _run_powershell_advanced(msi_path, 'files')
def local_list_shortcuts(msi_path: str) -> List[Dict[str, Optional[str]]]:
return _run_powershell_advanced(msi_path, 'shortcuts')
def local_list_custom_actions(msi_path: str) -> List[Dict[str, Optional[str]]]:
return _run_powershell_advanced(msi_path, 'custom_actions')
def local_read_summary_info(msi_path: str) -> Dict[str, Optional[str]]:
return _run_powershell_advanced(msi_path, 'summary')
def local_registry_list_installed() -> list:
return _run_registry('list')
def local_registry_find_by_name(name: str) -> list:
return _run_registry('find', name=name)
def local_registry_find_by_product_code(product_code: str) -> list:
return _run_registry('by_product_code', product_code=product_code)
def local_registry_get_uninstall_string(name: str) -> list:
return _run_registry('find', name=name)
def local_extract_msi_files(msi_path: str, output_dir: str) -> Dict[str, Any]:
lessmsi = shutil.which('lessmsi') or shutil.which('lessmsi.exe')
if not lessmsi:
raise RuntimeError('lessmsi not found in PATH. Install lessmsi to enable extraction.')
os.makedirs(output_dir, exist_ok=True)
completed = subprocess.run([lessmsi, 'x', msi_path, output_dir], capture_output=True, text=True)
return {
'returncode': completed.returncode,
'stdout': completed.stdout,
'stderr': completed.stderr,
}
if __name__ == '__main__':
mcp.run()