#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import sqlite3
import json
import logging
from typing import List, Dict, Optional, Tuple, Set
from datetime import datetime
from .xml_parser import XMLParser
logger = logging.getLogger(__name__)
class NotificationParser:
"""Windows Notification Database 파서"""
def __init__(self, db_path: str):
"""
Args:
db_path: wpndatabase.db 파일 경로
"""
self.db_path = db_path
self.conn = None
self.xml_parser = XMLParser()
self._column_cache: Dict[str, Set[str]] = {}
logger.info(f"Initialized NotificationParser: {db_path}")
def connect(self) -> bool:
"""데이터베이스 연결"""
try:
self.conn = sqlite3.connect(self.db_path)
self.conn.row_factory = sqlite3.Row
logger.info("Connected to notification database")
return True
except Exception as e:
logger.error(f"Failed to connect to database: {e}")
return False
def _get_columns(self, table_name: str) -> Set[str]:
"""테이블의 컬럼 목록을 조회 (캐싱 포함)"""
if table_name in self._column_cache:
return self._column_cache[table_name]
if not self.conn:
return set()
try:
cursor = self.conn.cursor()
cursor.execute(f"PRAGMA table_info({table_name})")
columns = {row['name'] for row in cursor.fetchall()}
self._column_cache[table_name] = columns
return columns
except Exception as e:
logger.error(f"Failed to get columns for {table_name}: {e}")
return set()
def get_all_notifications(self, limit: int = 1000) -> List[Dict]:
"""
모든 알림 조회
"""
if not self.conn:
if not self.connect():
return []
try:
cursor = self.conn.cursor()
# 동적 컬럼 처리 (Win10/11 버전 차이 대응)
noti_cols = self._get_columns("Notification")
# 컬럼이 없으면 NULL로 대체하여 조회
tag_col = "n.Tag" if "Tag" in noti_cols else "NULL as Tag"
group_col = "n.\"Group\"" if "Group" in noti_cols else "NULL as \"Group\""
query = f"""
SELECT
n.Id,
n.HandlerId,
nh.PrimaryId as AppId,
n.Type,
n.Payload,
n.PayloadType,
{tag_col},
{group_col},
n.ArrivalTime,
n.ExpiryTime
FROM Notification n
LEFT JOIN NotificationHandler nh ON n.HandlerId = nh.RecordId
ORDER BY n.ArrivalTime DESC
LIMIT ?
"""
cursor.execute(query, (limit,))
rows = cursor.fetchall()
notifications = []
for row in rows:
notif = {
"id": row["Id"],
"handler_id": row["HandlerId"],
"app_id": row["AppId"] if row["AppId"] else "Unknown",
"type": row["Type"],
"tag": row["Tag"],
"group": row["Group"],
# FILETIME 변환 적용
"arrival_time": self._format_timestamp(row["ArrivalTime"]),
"expiry_time": self._format_timestamp(row["ExpiryTime"]),
"payload": None,
"parsed_content": None
}
# Payload 파싱
if row["Payload"]:
try:
payload_text = row["Payload"].decode('utf-8', errors='ignore')
notif["payload"] = payload_text
# XML 파싱
parsed = self.xml_parser.parse_toast_xml(payload_text)
if parsed:
notif["parsed_content"] = parsed
except Exception as e:
# 페이로드 오류는 경고만 남기고 계속 진행
pass
notifications.append(notif)
logger.info(f"Retrieved {len(notifications)} notifications")
return notifications
except Exception as e:
logger.error(f"Failed to get all notifications: {e}")
return []
def search_notifications(self, keyword: str, limit: int = 100) -> List[Dict]:
"""키워드로 알림 검색"""
if not self.conn:
if not self.connect():
return []
try:
cursor = self.conn.cursor()
query = """
SELECT
n.Id,
nh.PrimaryId as AppId,
n.Payload,
n.ArrivalTime
FROM Notification n
LEFT JOIN NotificationHandler nh ON n.HandlerId = nh.RecordId
WHERE n.Payload LIKE ?
ORDER BY n.ArrivalTime DESC
LIMIT ?
"""
cursor.execute(query, (f'%{keyword}%', limit))
rows = cursor.fetchall()
results = []
for row in rows:
result = {
"id": row["Id"],
"app_id": row["AppId"] if row["AppId"] else "Unknown",
"arrival_time": self._format_timestamp(row["ArrivalTime"]),
"payload": None,
"parsed_content": None,
"matched_keyword": keyword
}
if row["Payload"]:
try:
payload_text = row["Payload"].decode('utf-8', errors='ignore')
result["payload"] = payload_text
parsed = self.xml_parser.parse_toast_xml(payload_text)
if parsed:
result["parsed_content"] = parsed
except:
pass
results.append(result)
logger.info(f"Found {len(results)} notifications matching '{keyword}'")
return results
except Exception as e:
logger.error(f"Failed to search notifications: {e}")
return []
def get_notifications_by_app(self, app_id: str, limit: int = 100) -> List[Dict]:
"""특정 앱의 알림 조회"""
if not self.conn:
if not self.connect():
return []
try:
cursor = self.conn.cursor()
noti_cols = self._get_columns("Notification")
tag_col = "n.Tag" if "Tag" in noti_cols else "NULL as Tag"
group_col = "n.\"Group\"" if "Group" in noti_cols else "NULL as \"Group\""
query = f"""
SELECT
n.Id,
nh.PrimaryId as AppId,
n.Payload,
n.ArrivalTime,
{tag_col},
{group_col}
FROM Notification n
LEFT JOIN NotificationHandler nh ON n.HandlerId = nh.RecordId
WHERE nh.PrimaryId LIKE ?
ORDER BY n.ArrivalTime DESC
LIMIT ?
"""
cursor.execute(query, (f'%{app_id}%', limit))
rows = cursor.fetchall()
notifications = []
for row in rows:
notif = {
"id": row["Id"],
"app_id": row["AppId"],
"arrival_time": self._format_timestamp(row["ArrivalTime"]),
"tag": row["Tag"],
"group": row["Group"],
"parsed_content": None
}
if row["Payload"]:
try:
payload_text = row["Payload"].decode('utf-8', errors='ignore')
parsed = self.xml_parser.parse_toast_xml(payload_text)
if parsed:
notif["parsed_content"] = parsed
except:
pass
notifications.append(notif)
logger.info(f"Found {len(notifications)} notifications for app '{app_id}'")
return notifications
except Exception as e:
logger.error(f"Failed to get notifications by app: {e}")
return []
def get_notification_timeline(self, start_time: Optional[int] = None,
end_time: Optional[int] = None,
limit: int = 1000) -> List[Dict]:
"""
시간 범위로 알림 타임라인 조회
Args:
start_time: 시작 시간 (Unix timestamp)
end_time: 종료 시간 (Unix timestamp)
limit: 최대 결과 수
"""
if not self.conn:
if not self.connect():
return []
try:
cursor = self.conn.cursor()
if start_time and end_time:
# Unix Timestamp -> Windows FILETIME 변환
ft_start = self._unix_to_filetime(start_time)
ft_end = self._unix_to_filetime(end_time)
query = """
SELECT
n.Id,
nh.PrimaryId as AppId,
n.Payload,
n.ArrivalTime
FROM Notification n
LEFT JOIN NotificationHandler nh ON n.HandlerId = nh.RecordId
WHERE n.ArrivalTime BETWEEN ? AND ?
ORDER BY n.ArrivalTime ASC
LIMIT ?
"""
cursor.execute(query, (ft_start, ft_end, limit))
else:
query = """
SELECT
n.Id,
nh.PrimaryId as AppId,
n.Payload,
n.ArrivalTime
FROM Notification n
LEFT JOIN NotificationHandler nh ON n.HandlerId = nh.RecordId
ORDER BY n.ArrivalTime ASC
LIMIT ?
"""
cursor.execute(query, (limit,))
rows = cursor.fetchall()
timeline = []
for row in rows:
event = {
"id": row["Id"],
"app_id": row["AppId"] if row["AppId"] else "Unknown",
"timestamp": self._format_timestamp(row["ArrivalTime"]),
# FILETIME을 다시 Unix Time으로 변환하여 반환
"unix_time": self._filetime_to_unix(row["ArrivalTime"]),
"content": None
}
if row["Payload"]:
try:
payload_text = row["Payload"].decode('utf-8', errors='ignore')
parsed = self.xml_parser.parse_toast_xml(payload_text)
if parsed:
event["content"] = parsed
except:
pass
timeline.append(event)
logger.info(f"Retrieved timeline with {len(timeline)} events")
return timeline
except Exception as e:
logger.error(f"Failed to get notification timeline: {e}")
return []
def get_statistics(self) -> Dict:
"""알림 통계 생성"""
if not self.conn:
if not self.connect():
return {}
try:
cursor = self.conn.cursor()
# 총 알림 수
cursor.execute("SELECT COUNT(*) as total FROM Notification")
row = cursor.fetchone()
total = row["total"] if row else 0
# 앱별 알림 수
cursor.execute("""
SELECT
nh.PrimaryId as AppId,
COUNT(*) as Count
FROM Notification n
LEFT JOIN NotificationHandler nh ON n.HandlerId = nh.RecordId
GROUP BY nh.PrimaryId
ORDER BY Count DESC
LIMIT 10
""")
app_stats = [{"app": row["AppId"] or "Unknown", "count": row["Count"]}
for row in cursor.fetchall()]
# 날짜 범위 (FILETIME 기준 MIN/MAX)
cursor.execute("""
SELECT
MIN(ArrivalTime) as first_time,
MAX(ArrivalTime) as last_time
FROM Notification
WHERE ArrivalTime > 0
""")
time_range = cursor.fetchone()
# 시간대별 분포 (FILETIME -> Unix -> Hour 변환)
# SQLite 내부 연산: (FILETIME / 10,000,000) - 11644473600 = Unix Timestamp
# 정확도를 위해 Python에서 처리하지 않고 SQL 산술 연산을 사용
cursor.execute("""
SELECT
strftime('%H', datetime((ArrivalTime / 10000000) - 11644473600, 'unixepoch')) as Hour,
COUNT(*) as Count
FROM Notification
WHERE ArrivalTime > 0
GROUP BY Hour
ORDER BY Hour
""")
hourly_dist = [{"hour": row["Hour"], "count": row["Count"]}
for row in cursor.fetchall() if row["Hour"] is not None]
stats = {
"total_notifications": total,
"date_range": {
"start": self._format_timestamp(time_range["first_time"]) if time_range else None,
"end": self._format_timestamp(time_range["last_time"]) if time_range else None
},
"top_apps": app_stats,
"hourly_distribution": hourly_dist
}
logger.info("Generated notification statistics")
return stats
except Exception as e:
logger.error(f"Failed to get statistics: {e}")
return {}
def list_apps(self) -> List[Dict]:
"""알림을 보낸 앱 목록"""
if not self.conn:
if not self.connect():
return []
try:
cursor = self.conn.cursor()
query = """
SELECT DISTINCT
nh.PrimaryId as AppId,
COUNT(n.Id) as NotificationCount
FROM NotificationHandler nh
LEFT JOIN Notification n ON nh.RecordId = n.HandlerId
GROUP BY nh.PrimaryId
ORDER BY NotificationCount DESC
"""
cursor.execute(query)
rows = cursor.fetchall()
apps = [{"app_id": row["AppId"] or "Unknown",
"notification_count": row["NotificationCount"]}
for row in rows]
logger.info(f"Found {len(apps)} apps")
return apps
except Exception as e:
logger.error(f"Failed to list apps: {e}")
return []
def extract_sensitive_content(self, keywords: Optional[List[str]] = None) -> Dict:
"""민감 정보 추출"""
if keywords is None:
keywords = {
"passwords": ["password", "pwd", "passcode", "pin"],
"codes": ["code", "verification", "OTP", "2FA", "verify"],
"emails": ["@"],
"confidential": ["confidential", "secret", "private", "classified"]
}
results = {}
for category, kw_list in keywords.items():
category_results = []
for keyword in kw_list:
matches = self.search_notifications(keyword, limit=50)
category_results.extend(matches)
# 중복 제거 (ID 기준)
unique_results = {r["id"]: r for r in category_results}.values()
results[category] = list(unique_results)
logger.info(f"Extracted sensitive content in {len(results)} categories")
return results
def _format_timestamp(self, filetime: Optional[int]) -> Optional[str]:
"""Windows FILETIME을 ISO 형식으로 변환"""
unix_ts = self._filetime_to_unix(filetime)
if unix_ts is None:
return None
try:
return datetime.fromtimestamp(unix_ts).isoformat()
except:
return None
def _filetime_to_unix(self, filetime: Optional[int]) -> Optional[float]:
"""Windows FILETIME -> Unix Timestamp (Float)"""
if filetime is None or filetime == 0:
return None
try:
# (FILETIME - 116444736000000000) / 10,000,000
us = (filetime - 116444736000000000) // 10
return us / 1000000.0
except Exception:
return None
def _unix_to_filetime(self, unix_ts: float) -> int:
"""Unix Timestamp -> Windows FILETIME"""
try:
return int((unix_ts * 10000000) + 116444736000000000)
except:
return 0
def close(self):
"""데이터베이스 연결 종료"""
if self.conn:
self.conn.close()
self.conn = None
self._column_cache.clear()
logger.info("Closed notification database connection")