#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import logging
import json
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
from tools.e01_handler import E01Handler
from tools.notification_parser import NotificationParser
import os
# 로깅 설정
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("notification-mcp-server")
# MCP 서버 생성
app = Server("notification-mcp-server")
# 글로벌 핸들러 (재사용)
e01_handler = None
current_e01_path = None
@app.list_tools()
async def list_tools() -> list[Tool]:
"""MCP 도구 목록"""
return [
Tool(
name="scan_e01_for_notification_dbs",
description="Scan E01 image for Windows Notification databases (wpndatabase.db). Returns list of found databases with metadata.",
inputSchema={
"type": "object",
"properties": {
"e01_path": {
"type": "string",
"description": "Path to E01 image file (e.g., /evidence/disk.E01)"
},
"partition_offset": {
"type": "integer",
"description": "NTFS partition offset in bytes (default: 0 for auto-detection)",
"default": 0
},
"max_results": {
"type": "integer",
"description": "Maximum number of databases to find (default: 10)",
"default": 10
}
},
"required": ["e01_path"]
}
),
Tool(
name="extract_notification_db",
description="Extract wpndatabase.db from E01 image to temporary location for analysis.",
inputSchema={
"type": "object",
"properties": {
"e01_path": {
"type": "string",
"description": "Path to E01 image file"
},
"username": {
"type": "string",
"description": "Windows username (e.g., john.doe)"
},
"partition_offset": {
"type": "integer",
"description": "NTFS partition offset in bytes",
"default": 0
}
},
"required": ["e01_path", "username"]
}
),
Tool(
name="parse_all_notifications",
description="Parse all notifications from extracted database. Returns JSON with notification details.",
inputSchema={
"type": "object",
"properties": {
"db_path": {
"type": "string",
"description": "Path to wpndatabase.db file (extracted or local)"
},
"limit": {
"type": "integer",
"description": "Maximum number of notifications to return (default: 1000)",
"default": 1000
}
},
"required": ["db_path"]
}
),
Tool(
name="search_notifications",
description="Search notifications by keyword. Useful for finding specific content like passwords, codes, or sensitive information.",
inputSchema={
"type": "object",
"properties": {
"db_path": {
"type": "string",
"description": "Path to wpndatabase.db file"
},
"keyword": {
"type": "string",
"description": "Search keyword (e.g., 'password', 'code', 'confidential')"
},
"limit": {
"type": "integer",
"description": "Maximum results (default: 100)",
"default": 100
}
},
"required": ["db_path", "keyword"]
}
),
Tool(
name="get_notifications_by_app",
description="Get notifications from specific application (e.g., Outlook, Slack, Teams).",
inputSchema={
"type": "object",
"properties": {
"db_path": {
"type": "string",
"description": "Path to wpndatabase.db file"
},
"app_id": {
"type": "string",
"description": "Application ID or name (e.g., 'Outlook', 'chrome.exe', 'Slack')"
},
"limit": {
"type": "integer",
"description": "Maximum results (default: 100)",
"default": 100
}
},
"required": ["db_path", "app_id"]
}
),
Tool(
name="get_notification_timeline",
description="Get chronological timeline of notifications. Optionally filter by time range.",
inputSchema={
"type": "object",
"properties": {
"db_path": {
"type": "string",
"description": "Path to wpndatabase.db file"
},
"start_time": {
"type": "integer",
"description": "Start time (Unix timestamp)"
},
"end_time": {
"type": "integer",
"description": "End time (Unix timestamp)"
},
"limit": {
"type": "integer",
"description": "Maximum results (default: 1000)",
"default": 1000
}
},
"required": ["db_path"]
}
),
Tool(
name="get_notification_statistics",
description="Generate statistical analysis of notifications (total count, app distribution, time patterns).",
inputSchema={
"type": "object",
"properties": {
"db_path": {
"type": "string",
"description": "Path to wpndatabase.db file"
}
},
"required": ["db_path"]
}
),
Tool(
name="list_notification_apps",
description="List all applications that sent notifications.",
inputSchema={
"type": "object",
"properties": {
"db_path": {
"type": "string",
"description": "Path to wpndatabase.db file"
}
},
"required": ["db_path"]
}
),
Tool(
name="extract_sensitive_content",
description="Extract notifications containing sensitive keywords (passwords, codes, confidential info).",
inputSchema={
"type": "object",
"properties": {
"db_path": {
"type": "string",
"description": "Path to wpndatabase.db file"
},
"custom_keywords": {
"type": "array",
"items": {"type": "string"},
"description": "Custom keywords to search for (optional)"
}
},
"required": ["db_path"]
}
),
Tool(
name="auto_analyze_e01_notifications",
description="[HIGH-LEVEL] Automatically scan E01, extract all notification databases, and generate comprehensive analysis report.",
inputSchema={
"type": "object",
"properties": {
"e01_path": {
"type": "string",
"description": "Path to E01 image file"
},
"partition_offset": {
"type": "integer",
"description": "NTFS partition offset",
"default": 0
},
"include_sensitive": {
"type": "boolean",
"description": "Include sensitive content analysis (default: true)",
"default": True
}
},
"required": ["e01_path"]
}
)
]
@app.call_tool()
async def call_tool(name: str, arguments: dict):
"""도구 실행"""
try:
if name == "scan_e01_for_notification_dbs":
return await scan_e01_for_notification_dbs(arguments)
elif name == "extract_notification_db":
return await extract_notification_db(arguments)
elif name == "parse_all_notifications":
return await parse_all_notifications(arguments)
elif name == "search_notifications":
return await search_notifications(arguments)
elif name == "get_notifications_by_app":
return await get_notifications_by_app(arguments)
elif name == "get_notification_timeline":
return await get_notification_timeline(arguments)
elif name == "get_notification_statistics":
return await get_notification_statistics(arguments)
elif name == "list_notification_apps":
return await list_notification_apps(arguments)
elif name == "extract_sensitive_content":
return await extract_sensitive_content(arguments)
elif name == "auto_analyze_e01_notifications":
return await auto_analyze_e01_notifications(arguments)
else:
return [TextContent(
type="text",
text=json.dumps({"error": f"Unknown tool: {name}"}, indent=2)
)]
except Exception as e:
logger.error(f"Error in tool {name}: {e}", exc_info=True)
return [TextContent(
type="text",
text=json.dumps({"error": str(e), "tool": name}, indent=2)
)]
# =============================================================================
# 도구 구현
# =============================================================================
async def scan_e01_for_notification_dbs(args: dict):
"""E01 이미지 스캔"""
global e01_handler, current_e01_path
e01_path = args["e01_path"]
partition_offset = args.get("partition_offset", 0)
max_results = args.get("max_results", 10)
# E01 핸들러 초기화
if current_e01_path != e01_path or \
e01_handler is None or \
e01_handler.current_offset != partition_offset:
if e01_handler:
e01_handler.close()
e01_handler = E01Handler()
e01_handler.open_e01(e01_path, partition_offset)
current_e01_path = e01_path
# DB 검색
dbs = e01_handler.find_notification_dbs(max_results)
result = {
"status": "success",
"e01_path": e01_path,
"databases_found": len(dbs),
"databases": dbs
}
return [TextContent(type="text", text=json.dumps(result, indent=2))]
async def extract_notification_db(args: dict):
"""DB 파일 추출"""
global e01_handler, current_e01_path
e01_path = args["e01_path"]
username = args["username"]
partition_offset = args.get("partition_offset", 0)
# E01 핸들러 초기화
# partition_offset 변경되면 반드시 새로 열어야 함
if (
current_e01_path != e01_path or
e01_handler is None or
(hasattr(e01_handler, "current_offset") and e01_handler.current_offset != partition_offset)
):
if e01_handler:
e01_handler.close()
e01_handler = E01Handler()
if not e01_handler.open_e01(e01_path, partition_offset):
return [TextContent(
type="text",
text=json.dumps({"error": "Failed to open E01 image"}, indent=2)
)]
current_e01_path = e01_path
# 파일 경로 구성
db_path = f"/Users/{username}/AppData/Local/Microsoft/Windows/Notifications/wpndatabase.db"
# 파일 추출
extracted_path = e01_handler.extract_file(db_path)
if not extracted_path:
return [TextContent(
type="text",
text=json.dumps({"error": f"Failed to extract database for user {username}"}, indent=2)
)]
result = {
"status": "success",
"username": username,
"extracted_to": extracted_path,
"size": os.path.getsize(extracted_path)
}
return [TextContent(type="text", text=json.dumps(result, indent=2))]
async def parse_all_notifications(args: dict):
"""모든 알림 파싱"""
db_path = args["db_path"]
limit = args.get("limit", 1000)
parser = NotificationParser(db_path)
notifications = parser.get_all_notifications(limit)
parser.close()
result = {
"status": "success",
"database": db_path,
"total_parsed": len(notifications),
"notifications": notifications
}
return [TextContent(type="text", text=json.dumps(result, indent=2))]
async def search_notifications(args: dict):
"""키워드 검색"""
db_path = args["db_path"]
keyword = args["keyword"]
limit = args.get("limit", 100)
parser = NotificationParser(db_path)
results = parser.search_notifications(keyword, limit)
parser.close()
result = {
"status": "success",
"keyword": keyword,
"matches_found": len(results),
"results": results
}
return [TextContent(type="text", text=json.dumps(result, indent=2))]
async def get_notifications_by_app(args: dict):
"""앱별 알림 조회"""
db_path = args["db_path"]
app_id = args["app_id"]
limit = args.get("limit", 100)
parser = NotificationParser(db_path)
notifications = parser.get_notifications_by_app(app_id, limit)
parser.close()
result = {
"status": "success",
"app_id": app_id,
"count": len(notifications),
"notifications": notifications
}
return [TextContent(type="text", text=json.dumps(result, indent=2))]
async def get_notification_timeline(args: dict):
"""타임라인 조회"""
db_path = args["db_path"]
start_time = args.get("start_time")
end_time = args.get("end_time")
limit = args.get("limit", 1000)
parser = NotificationParser(db_path)
timeline = parser.get_notification_timeline(start_time, end_time, limit)
parser.close()
result = {
"status": "success",
"events_count": len(timeline),
"timeline": timeline
}
return [TextContent(type="text", text=json.dumps(result, indent=2))]
async def get_notification_statistics(args: dict):
"""통계 생성"""
db_path = args["db_path"]
parser = NotificationParser(db_path)
stats = parser.get_statistics()
parser.close()
result = {
"status": "success",
"statistics": stats
}
return [TextContent(type="text", text=json.dumps(result, indent=2))]
async def list_notification_apps(args: dict):
"""앱 목록"""
db_path = args["db_path"]
parser = NotificationParser(db_path)
apps = parser.list_apps()
parser.close()
result = {
"status": "success",
"total_apps": len(apps),
"apps": apps
}
return [TextContent(type="text", text=json.dumps(result, indent=2))]
async def extract_sensitive_content(args: dict):
"""민감 정보 추출"""
db_path = args["db_path"]
custom_keywords = args.get("custom_keywords")
parser = NotificationParser(db_path)
if custom_keywords:
keywords = {"custom": custom_keywords}
sensitive = parser.extract_sensitive_content(keywords)
else:
sensitive = parser.extract_sensitive_content()
parser.close()
# 각 카테고리별 개수 계산
summary = {category: len(items) for category, items in sensitive.items()}
result = {
"status": "success",
"summary": summary,
"sensitive_content": sensitive
}
return [TextContent(type="text", text=json.dumps(result, indent=2))]
async def auto_analyze_e01_notifications(args: dict):
"""자동 종합 분석"""
global e01_handler, current_e01_path
e01_path = args["e01_path"]
partition_offset = args.get("partition_offset", 0)
include_sensitive = args.get("include_sensitive", True)
# 1. E01 열기
if current_e01_path != e01_path or e01_handler is None:
if e01_handler:
e01_handler.close()
e01_handler = E01Handler()
if not e01_handler.open_e01(e01_path, partition_offset):
return [TextContent(
type="text",
text=json.dumps({"error": "Failed to open E01 image"}, indent=2)
)]
current_e01_path = e01_path
# 2. DB 검색
dbs = e01_handler.find_notification_dbs()
if not dbs:
return [TextContent(
type="text",
text=json.dumps({"status": "success", "message": "No notification databases found"}, indent=2)
)]
# 3. 각 사용자별 분석
users_analysis = []
for db_info in dbs:
username = db_info["username"]
# [수정됨] 해당 DB가 발견된 파티션 오프셋으로 핸들러 재설정
found_offset = db_info.get("partition_offset", 0)
if found_offset > 0:
e01_handler.open_e01(e01_path, found_offset)
# DB 추출
extracted_path = e01_handler.extract_file(db_info["path"])
if not extracted_path:
continue
# 파싱
parser = NotificationParser(extracted_path)
user_data = {
"username": username,
"database_size": db_info["size"],
"statistics": parser.get_statistics(),
"top_apps": parser.list_apps()[:5]
}
if include_sensitive:
sensitive = parser.extract_sensitive_content()
user_data["sensitive_findings"] = {
category: len(items) for category, items in sensitive.items()
}
parser.close()
# 임시 파일 삭제
if os.path.exists(extracted_path):
os.unlink(extracted_path)
users_analysis.append(user_data)
result = {
"status": "success",
"e01_image": e01_path,
"total_users": len(users_analysis),
"users": users_analysis
}
return [TextContent(type="text", text=json.dumps(result, indent=2))]
def main():
"""서버 시작"""
logger.info("Starting Windows Notification MCP Server...")
async def run_server():
async with stdio_server() as (read_stream, write_stream):
await app.run(read_stream, write_stream, app.create_initialization_options())
import asyncio
asyncio.run(run_server())
if __name__ == "__main__":
main()