#!/usr/bin/env python3
"""
E01 LNK MCP Server
"""
import os
import sys
import tempfile
import json
import atexit
import shutil
from typing import Any
import asyncio
# -----------------------------
# 라이브러리 임포트 및 검증
# -----------------------------
try:
import pyewf
import pytsk3
except ImportError:
print("Critical Error: pyewf or pytsk3 not installed.", file=sys.stderr)
sys.exit(1)
try:
from LnkParse3 import lnk_file
HAS_LNKPARSE = True
except ImportError:
HAS_LNKPARSE = False
print("Warning: LnkParse3 not available.", file=sys.stderr)
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
# 전역 임시 디렉토리 관리
TEMP_DIRS = []
def cleanup_temp_dirs():
for temp_dir in TEMP_DIRS:
if os.path.exists(temp_dir):
try:
shutil.rmtree(temp_dir, ignore_errors=True)
except Exception:
pass
atexit.register(cleanup_temp_dirs)
# -----------------------------
# [핵심] E01 Wrapper & FS Logic
# -----------------------------
class EWFImgInfo(pytsk3.Img_Info):
"""pyewf 핸들을 pytsk3가 이해할 수 있는 Img_Info로 변환"""
def __init__(self, ewf_handle):
self._ewf_handle = ewf_handle
super(EWFImgInfo, self).__init__(url="", type=pytsk3.TSK_IMG_TYPE_EXTERNAL)
def read(self, offset, size):
self._ewf_handle.seek(offset)
return self._ewf_handle.read(size)
def get_size(self):
return self._ewf_handle.get_media_size()
def open_e01_image(e01_path: str):
"""E01 파일을 열고 파일시스템(FS_Info) 핸들을 리턴"""
if not os.path.exists(e01_path):
raise FileNotFoundError(f"File not found: {e01_path}")
# pyewf용 파일 리스트 생성
filenames = pyewf.glob(e01_path)
if not filenames:
filenames = [e01_path]
try:
ewf_handle = pyewf.handle()
ewf_handle.open(filenames)
except Exception as e:
raise RuntimeError(f"Failed to open E01 with pyewf: {e}")
img_info = EWFImgInfo(ewf_handle)
# 파티션 탐색 및 마운트
try:
volume = pytsk3.Volume_Info(img_info)
best_fs = None
max_size = 0
for part in volume:
if not part.flags & pytsk3.TSK_VS_PART_FLAG_ALLOC:
continue
try:
offset = part.start * 512
fs = pytsk3.FS_Info(img_info, offset=offset)
if part.len > max_size:
max_size = part.len
best_fs = fs
except Exception:
continue
if best_fs:
return best_fs
else:
raise RuntimeError("Found partitions but no valid filesystem found.")
except Exception:
# 파티션 테이블 없는 경우(Logical Image)
try:
return pytsk3.FS_Info(img_info)
except Exception as e:
raise RuntimeError(f"Could not open filesystem: {e}")
# -----------------------------
# LNK 파싱 및 탐색 로직
# -----------------------------
def parse_lnk_file(file_path: str) -> dict:
if not HAS_LNKPARSE: return {"error": "LnkParse3 not installed"}
try:
with open(file_path, "rb") as f:
lnk = lnk_file.LnkFile(indata=f.read())
# 결과 정리
res = {
"header": {},
"link_info": {},
"string_data": {},
"extra_data": {}
}
# Header
if lnk.header:
res["header"] = {k: str(v) for k, v in lnk.header.items()}
# String Data
if lnk.string_data:
res["string_data"] = lnk.string_data
# Link Info
if hasattr(lnk, 'link_info'):
res["link_info"] = {
"local_base_path": lnk.link_info.get("local_base_path"),
"common_path": lnk.link_info.get("common_path")
}
return res
except Exception as e:
return {"error": str(e)}
def lnk_to_dfir_summary(lnk_data: dict, original_name: str, inode: int) -> dict:
"""분석용 요약 생성"""
if "error" in lnk_data: return lnk_data
header = lnk_data.get("header", {})
s_data = lnk_data.get("string_data", {})
l_info = lnk_data.get("link_info", {})
return {
"lnk_file": original_name,
"inode": inode,
"timestamps": {
"modified": header.get("modified_time"),
"accessed": header.get("access_time"),
"created": header.get("creation_time")
},
"target_path": l_info.get("local_base_path") or s_data.get("relative_path"),
"arguments": s_data.get("command_line_arguments")
}
def walk_fs(directory, results: list, max_results: int, current_path: str, user_filter: str = None):
if len(results) >= max_results: return
try:
for entry in directory:
if len(results) >= max_results: break
if not hasattr(entry, "info") or not hasattr(entry.info, "name"): continue
try:
name = entry.info.name.name.decode(errors="ignore")
except: continue
if name in [".", "..", "$OrphanFiles", "$MFT", "$LogFile"]: continue
full_path = f"{current_path}/{name}".replace("//","/")
# 사용자 필터가 있고, 경로에 포함되지 않으면 스킵 (단, 디렉토리 탐색은 계속)
is_match = True
if user_filter and user_filter.lower() not in full_path.lower():
is_match = False
if entry.info.meta.type == pytsk3.TSK_FS_META_TYPE_REG:
if name.lower().endswith(".lnk") and is_match:
results.append({"entry": entry, "path": full_path})
elif entry.info.meta.type == pytsk3.TSK_FS_META_TYPE_DIR:
try:
walk_fs(entry.as_directory(), results, max_results, full_path, user_filter)
except: pass
except: pass
# -----------------------------
# MCP 서버 정의 (5개 툴 모두 포함)
# -----------------------------
server = Server("e01-lnk-tool")
@server.list_tools()
async def list_tools() -> list[Tool]:
return [
Tool(
name="scan_lnk_in_e01",
description="E01 이미지 내 LNK 파일 스캔 (기본 정보)",
inputSchema={
"type": "object",
"properties": {
"e01_path": {"type": "string"},
"max_results": {"type": "integer", "default": 1000}
},
"required": ["e01_path"]
}
),
Tool(
name="extract_lnk_by_inode",
description="특정 Inode의 LNK 파일 추출",
inputSchema={
"type": "object",
"properties": {
"e01_path": {"type": "string"},
"inode": {"type": "integer"}
},
"required": ["e01_path", "inode"]
}
),
Tool(
name="parse_lnk",
description="로컬 LNK 파일 파싱",
inputSchema={
"type": "object",
"properties": {"file_path": {"type": "string"}},
"required": ["file_path"]
}
),
Tool(
name="auto_extract_and_parse_lnk",
description="[고급] E01 내 모든 LNK 자동 추출 및 파싱 (DFIR 요약)",
inputSchema={
"type": "object",
"properties": {
"e01_path": {"type": "string"},
"max_results": {"type": "integer", "default": 100},
"dfir_summary": {"type": "boolean", "default": True}
},
"required": ["e01_path"]
}
),
Tool(
name="extract_lnk_timeline",
description="[고급] LNK 파일 타임라인 분석 (경로 필터링 가능)",
inputSchema={
"type": "object",
"properties": {
"e01_path": {"type": "string"},
"max_results": {"type": "integer", "default": 1000},
"user_filter": {"type": "string", "description": "예: 'Users/John'"}
},
"required": ["e01_path"]
}
)
]
@server.call_tool()
async def call_tool(name: str, arguments: Any) -> list[TextContent]:
try:
e01_path = arguments.get("e01_path")
# 1. scan_lnk_in_e01
if name == "scan_lnk_in_e01":
fs = open_e01_image(e01_path)
entries = []
walk_fs(fs.open_dir(path="/"), entries, arguments.get("max_results", 1000), "/", None)
results = []
for item in entries:
meta = item["entry"].info.meta
results.append({
"path": item["path"],
"inode": int(meta.addr),
"size": int(meta.size)
})
return [TextContent(type="text", text=json.dumps({"success": True, "count": len(results), "files": results}, indent=2))]
# 2. extract_lnk_by_inode
elif name == "extract_lnk_by_inode":
fs = open_e01_image(e01_path)
inode = int(arguments["inode"])
f = fs.open_meta(inode=inode)
td = tempfile.mkdtemp()
TEMP_DIRS.append(td)
op = os.path.join(td, f"{inode}.lnk")
with open(op, "wb") as out:
size = f.info.meta.size
out.write(f.read_random(0, size))
return [TextContent(type="text", text=json.dumps({"success": True, "local_path": op}, indent=2))]
# 3. parse_lnk
elif name == "parse_lnk":
return [TextContent(type="text", text=json.dumps(parse_lnk_file(arguments["file_path"]), indent=2))]
# 4. auto_extract_and_parse_lnk
elif name == "auto_extract_and_parse_lnk":
fs = open_e01_image(e01_path)
entries = []
walk_fs(fs.open_dir(path="/"), entries, arguments.get("max_results", 100), "/", None)
results = []
for item in entries:
inode = int(item["entry"].info.meta.addr)
try:
# 추출
f = fs.open_meta(inode=inode)
td = tempfile.mkdtemp()
TEMP_DIRS.append(td)
op = os.path.join(td, f"{inode}.lnk")
with open(op, "wb") as out:
out.write(f.read_random(0, f.info.meta.size))
# 파싱
parsed = parse_lnk_file(op)
if arguments.get("dfir_summary", True):
summary = lnk_to_dfir_summary(parsed, item["path"].split("/")[-1], inode)
summary["full_path"] = item["path"]
results.append(summary)
else:
results.append({"path": item["path"], "parsed": parsed})
except: continue
return [TextContent(type="text", text=json.dumps({"success": True, "count": len(results), "data": results}, indent=2))]
# 5. extract_lnk_timeline
elif name == "extract_lnk_timeline":
fs = open_e01_image(e01_path)
entries = []
walk_fs(fs.open_dir(path="/"), entries, arguments.get("max_results", 1000), "/", arguments.get("user_filter"))
timeline = []
for item in entries:
meta = item["entry"].info.meta
timeline.append({
"path": item["path"],
"inode": int(meta.addr),
"mtime": meta.mtime,
"atime": meta.atime,
"crtime": meta.crtime
})
# 시간순 정렬
timeline.sort(key=lambda x: x["mtime"])
return [TextContent(type="text", text=json.dumps({"success": True, "count": len(timeline), "timeline": timeline}, indent=2))]
else:
return [TextContent(type="text", text=json.dumps({"success": False, "error": "Unknown tool"}))]
except Exception as e:
return [TextContent(type="text", text=json.dumps({"success": False, "error": f"Error: {str(e)}"}))]
async def main():
async with stdio_server() as (r, w):
await server.run(r, w, server.create_initialization_options())
if __name__ == "__main__":
asyncio.run(main())