#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import pytsk3
import pyewf
import logging
import tempfile
import os
from typing import List, Dict, Optional, Generator
logger = logging.getLogger(__name__)
class EWFImgInfo(pytsk3.Img_Info):
"""E01 이미지를 pytsk3에서 사용 가능하게 하는 래퍼 클래스"""
def __init__(self, ewf_handle):
self._ewf_handle = ewf_handle
super(EWFImgInfo, self).__init__(url="", type=pytsk3.TSK_IMG_TYPE_EXTERNAL)
def close(self):
self._ewf_handle.close()
def read(self, offset, size):
self._ewf_handle.seek(offset)
return self._ewf_handle.read(size)
def get_size(self):
return self._ewf_handle.get_media_size()
class E01Handler:
"""E01 이미지 처리 및 파일 추출"""
def __init__(self):
self.ewf_handle = None
self.img_info = None
self.fs_info = None
self.current_offset = 0
logger.info("Initialized E01Handler")
def open_e01(self, e01_path: str, partition_offset: int = 0) -> bool:
"""E01 이미지 열기"""
try:
# E01 파일 glob (분할된 E01 파일 처리)
filenames = pyewf.glob(e01_path)
if not filenames:
logger.error(f"No E01 files found: {e01_path}")
return False
# EWF 핸들 열기
self.ewf_handle = pyewf.handle()
self.ewf_handle.open(filenames)
# pytsk3 이미지 정보 생성
self.img_info = EWFImgInfo(self.ewf_handle)
# 1. 사용자가 명시한 오프셋이 있는 경우
if partition_offset > 0:
try:
self.fs_info = pytsk3.FS_Info(self.img_info, offset=partition_offset)
self.current_offset = partition_offset
logger.info(f"Opened filesystem at specified offset: {partition_offset}")
return True
except Exception as e:
logger.error(f"Failed to open FS at offset {partition_offset}: {e}")
return False
# 2. 오프셋이 0인 경우 (자동 탐지 필요하지만, 여기서는 이미지 핸들만 열어둠)
# 실제 파티션 마운트는 find_notification_dbs나 iter_partitions에서 수행
self.current_offset = 0
logger.info(f"Opened E01 image successfully: {e01_path}")
return True
except Exception as e:
logger.error(f"Failed to open E01: {e}")
return False
def _get_ntfs_partitions(self) -> List[int]:
"""이미지 내의 모든 NTFS 파티션 오프셋 찾기"""
offsets = []
try:
volume = pytsk3.Volume_Info(self.img_info)
for part in volume:
# 할당된 파티션만 체크
if part.flags & pytsk3.TSK_VS_PART_FLAG_ALLOC:
offset = part.start * 512
try:
# 파일시스템 타입 체크
fs = pytsk3.FS_Info(self.img_info, offset=offset)
if fs.info.ftype == pytsk3.TSK_FS_TYPE_NTFS:
offsets.append(offset)
except:
continue
except Exception as e:
# 볼륨 시스템이 없는 경우(단일 파티션 이미지 등) 0번 오프셋 시도
try:
fs = pytsk3.FS_Info(self.img_info, offset=0)
if fs.info.ftype == pytsk3.TSK_FS_TYPE_NTFS:
offsets.append(0)
except:
pass
return offsets
def find_notification_dbs(self, max_results: int = 10) -> List[Dict]:
"""
모든 파티션을 순회하며 wpndatabase.db 찾기
"""
results = []
target_paths = [
"/AppData/Local/Microsoft/Windows/Notifications/wpndatabase.db", # Win 10/11 User
"/ProgramData/Microsoft/Windows/Notifications/wpndatabase.db" # System
]
# 1. 현재 마운트된 FS가 있으면 거기서 검색
if self.fs_info:
logger.info(f"Scanning current filesystem at offset {self.current_offset}...")
results.extend(self._scan_fs(self.fs_info, target_paths, self.current_offset))
# 2. 결과가 없거나 부족하면 모든 NTFS 파티션 검색 (핵심 로직 변경)
if len(results) == 0:
logger.info("No DBs found in current/default FS. Scanning ALL NTFS partitions...")
partitions = sorted(
self._get_ntfs_partitions(),
key=lambda off: pytsk3.FS_Info(self.img_info, offset=off).info.block_count,
reverse=True
)
for offset in partitions:
# 이미 스캔한 오프셋 제외
if self.fs_info and offset == self.current_offset:
continue
try:
logger.info(f"Mounting partition at offset {offset}...")
fs = pytsk3.FS_Info(self.img_info, offset=offset)
# 해당 파티션 스캔
found = self._scan_fs(fs, target_paths, offset)
results.extend(found)
# 찾았으면 해당 FS를 메인으로 설정 (추출을 위해)
if found:
self.fs_info = fs
self.current_offset = offset
if len(results) >= max_results:
break
except Exception as e:
logger.warning(f"Failed to scan partition at {offset}: {e}")
return results[:max_results]
def _scan_fs(self, fs: pytsk3.FS_Info, target_subpaths: List[str], partition_offset: int) -> List[Dict]:
"""단일 파일시스템 스캔 헬퍼 함수"""
found_dbs = []
# 1. /Users 대소문자 무시 탐색
possible_users_dirs = ["/Users", "/users", "/USERS"]
users_dir = None
users_base_path = None
for cand in possible_users_dirs:
try:
users_dir = fs.open_dir(cand)
users_base_path = cand
break
except:
continue
if users_dir:
# Users 내부 사용자 폴더 순회
for user_entry in users_dir:
name = user_entry.info.name.name.decode("utf-8")
if name in [".", ".."]:
continue
# /Users/username/AppData/Local/... 경로
full_path = f"{users_base_path}/{name}{target_subpaths[0]}"
if self._file_exists(fs, full_path):
found_dbs.append(
self._get_file_info(fs, full_path, name, partition_offset)
)
# 2. ProgramData 검색
try:
sys_path = target_subpaths[1]
if self._file_exists(fs, sys_path):
found_dbs.append(
self._get_file_info(fs, sys_path, "SYSTEM", partition_offset)
)
except:
pass
return found_dbs
def _file_exists(self, fs, path):
try:
fs.open(path)
return True
except:
return False
def _get_file_info(self, fs, path, owner, offset):
f = fs.open(path)
return {
"username": owner,
"path": path,
"partition_offset": offset, # 중요: 추출 시 필요
"size": f.info.meta.size,
"modified": f.info.meta.mtime,
"inode": f.info.meta.addr
}
def extract_file(self, file_path: str, output_path: Optional[str] = None) -> Optional[str]:
"""파일 추출"""
if not self.fs_info:
logger.error("Filesystem not opened")
return None
try:
file_entry = self.fs_info.open(file_path)
size = file_entry.info.meta.size
if output_path is None:
tmp = tempfile.NamedTemporaryFile(delete=False, suffix='.db')
output_path = tmp.name
tmp.close()
with open(output_path, 'wb') as f:
offset = 0
BUFF_SIZE = 1024 * 1024
while offset < size:
read_len = min(BUFF_SIZE, size - offset)
data = file_entry.read_random(offset, read_len)
if not data: break
f.write(data)
offset += len(data)
logger.info(f"Extracted: {file_path}")
return output_path
except Exception as e:
logger.error(f"Extract failed: {e}")
return None
def close(self):
if self.ewf_handle:
self.ewf_handle.close()
self.ewf_handle = None