"""
Compliance Scanner MCP Server
ISMS-P, NIST, CIS Benchmark 등 보안 규제 문서를 분석하는 MCP 서버입니다.
Tools:
- analyze_regulation: 규제 텍스트 분석
- read_pdf: PDF 파일 읽기
- search_in_pdf: PDF 키워드 검색
- analyze_pdf_regulation: PDF 읽기 + 분석
- list_regulations: 규제 파일 목록
- compare_regulations: 규제 비교 분석
Resources:
- regulation://list: 규제 목록
- regulation://isms-p/{item_id}: ISMS-P 규제 조회
"""
import asyncio
import json
from pathlib import Path
from typing import Any
from mcp.server import Server
from mcp.types import Tool, Resource, TextContent
import mcp.server.stdio
from .analyzers.text_analyzer import analyze_regulation_text, compare_regulations
from .analyzers.pdf_reader import (
read_pdf_to_text,
search_keyword_in_pdf,
get_pdf_info,
list_regulation_files
)
# MCP 서버 인스턴스 생성
server = Server("compliance-scanner-mcp")
# 기본 경로 설정
BASE_DIR = Path(__file__).parent.parent
REGULATIONS_DIR = BASE_DIR / "regulations"
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# Tools 정의
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
@server.list_tools()
async def list_tools() -> list[Tool]:
"""사용 가능한 도구 목록을 반환합니다."""
return [
Tool(
name="analyze_regulation",
description="규제 텍스트를 분석하여 구조화된 JSON으로 변환합니다. "
"규제 ID, 제목, 요약, 요구사항, 기술 키워드, AWS 서비스 추천, "
"난이도, 예상 구현 일수를 추출합니다.",
inputSchema={
"type": "object",
"properties": {
"regulation_text": {
"type": "string",
"description": "분석할 규제 텍스트 (ISMS-P, NIST, CIS 등)"
}
},
"required": ["regulation_text"]
}
),
Tool(
name="read_pdf",
description="PDF 파일을 읽어서 텍스트로 변환합니다. "
"페이지 범위를 지정할 수 있습니다.",
inputSchema={
"type": "object",
"properties": {
"pdf_path": {
"type": "string",
"description": "PDF 경로 (예: 'regulations/ISMS-P.pdf' 또는 'ISMS-P.pdf')"
},
"start_page": {
"type": "integer",
"description": "시작 페이지 (기본값: 1)",
"default": 1
},
"end_page": {
"type": "integer",
"description": "끝 페이지 (기본값: 마지막 페이지)"
}
},
"required": ["pdf_path"]
}
),
Tool(
name="search_in_pdf",
description="PDF 파일에서 키워드를 검색합니다. "
"대소문자 구분 없이 검색하며, 매칭된 줄과 페이지 정보를 반환합니다.",
inputSchema={
"type": "object",
"properties": {
"pdf_path": {
"type": "string",
"description": "PDF 경로"
},
"keyword": {
"type": "string",
"description": "검색할 키워드"
}
},
"required": ["pdf_path", "keyword"]
}
),
Tool(
name="analyze_pdf_regulation",
description="PDF 파일을 읽고 바로 분석합니다 (read_pdf + analyze_regulation 결합). "
"PDF에서 텍스트를 추출한 후 규제 분석을 수행합니다.",
inputSchema={
"type": "object",
"properties": {
"pdf_path": {
"type": "string",
"description": "PDF 경로"
},
"start_page": {
"type": "integer",
"description": "시작 페이지 (선택)"
},
"end_page": {
"type": "integer",
"description": "끝 페이지 (선택)"
}
},
"required": ["pdf_path"]
}
),
Tool(
name="list_regulations",
description="regulations/ 폴더의 모든 규제 문서 파일 목록을 반환합니다. "
"파일 경로, 이름, 타입, 크기, 페이지 수(PDF) 정보를 포함합니다.",
inputSchema={
"type": "object",
"properties": {},
"required": []
}
),
Tool(
name="compare_regulations",
description="여러 규제 텍스트를 비교 분석합니다. "
"공통 요구사항, 차이점, 통합 구현 방안을 제시합니다.",
inputSchema={
"type": "object",
"properties": {
"regulation_texts": {
"type": "array",
"items": {"type": "string"},
"description": "비교할 규제 텍스트 목록 (최소 2개)"
}
},
"required": ["regulation_texts"]
}
)
]
@server.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
"""도구 호출을 처리합니다."""
try:
if name == "analyze_regulation":
result = analyze_regulation_text(arguments["regulation_text"])
return [TextContent(type="text", text=json.dumps(result, ensure_ascii=False, indent=2))]
elif name == "read_pdf":
text = read_pdf_to_text(
arguments["pdf_path"],
arguments.get("start_page", 1),
arguments.get("end_page")
)
return [TextContent(type="text", text=text)]
elif name == "search_in_pdf":
result = search_keyword_in_pdf(
arguments["pdf_path"],
arguments["keyword"]
)
return [TextContent(type="text", text=json.dumps(result, ensure_ascii=False, indent=2))]
elif name == "analyze_pdf_regulation":
# PDF 읽기
text = read_pdf_to_text(
arguments["pdf_path"],
arguments.get("start_page", 1),
arguments.get("end_page")
)
# 분석
result = analyze_regulation_text(text)
# PDF 정보 추가
pdf_info = get_pdf_info(arguments["pdf_path"])
result["pdf_info"] = pdf_info
return [TextContent(type="text", text=json.dumps(result, ensure_ascii=False, indent=2))]
elif name == "list_regulations":
files = list_regulation_files()
return [TextContent(type="text", text=json.dumps(files, ensure_ascii=False, indent=2))]
elif name == "compare_regulations":
regulation_texts = arguments["regulation_texts"]
if len(regulation_texts) < 2:
return [TextContent(
type="text",
text=json.dumps({"error": "비교를 위해 최소 2개의 규제 텍스트가 필요합니다."}, ensure_ascii=False)
)]
# 각 규제 분석
analyses = [analyze_regulation_text(text) for text in regulation_texts]
# 비교
result = compare_regulations(analyses)
return [TextContent(type="text", text=json.dumps(result, ensure_ascii=False, indent=2))]
else:
return [TextContent(type="text", text=f"알 수 없는 도구: {name}")]
except FileNotFoundError as e:
return [TextContent(type="text", text=f"파일 오류: {str(e)}")]
except ValueError as e:
return [TextContent(type="text", text=f"값 오류: {str(e)}")]
except Exception as e:
return [TextContent(type="text", text=f"오류 발생: {str(e)}")]
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# Resources 정의
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
@server.list_resources()
async def list_resources() -> list[Resource]:
"""사용 가능한 리소스 목록을 반환합니다."""
resources = [
Resource(
uri="regulation://list",
name="규제 문서 목록",
description="regulations/ 폴더의 모든 규제 문서 파일 목록",
mimeType="text/plain"
)
]
# ISMS-P 텍스트 파일들을 동적으로 리소스로 추가
if REGULATIONS_DIR.exists():
for file_path in REGULATIONS_DIR.glob("isms-p-*.txt"):
# 파일명에서 item_id 추출 (예: isms-p-2-8-1.txt → 2-8-1)
item_id = file_path.stem.replace("isms-p-", "")
resources.append(Resource(
uri=f"regulation://isms-p/{item_id}",
name=f"ISMS-P {item_id.replace('-', '.')}",
description=f"ISMS-P {item_id.replace('-', '.')} 규제 문서",
mimeType="text/plain"
))
return resources
@server.read_resource()
async def read_resource(uri: str) -> str:
"""리소스를 읽어서 반환합니다."""
if uri == "regulation://list":
# 규제 파일 목록 반환
files = list_regulation_files()
if not files:
return "regulations/ 폴더에 파일이 없습니다."
lines = ["=== 규제 문서 목록 ===\n"]
for f in files:
line = f"• {f['name']} ({f['type']}, {f['size_mb']}MB)"
if 'pages' in f:
line += f" - {f['pages']}페이지"
lines.append(line)
return "\n".join(lines)
elif uri.startswith("regulation://isms-p/"):
# ISMS-P 규제 조회
item_id = uri.replace("regulation://isms-p/", "")
file_path = REGULATIONS_DIR / f"isms-p-{item_id}.txt"
if not file_path.exists():
return f"ISMS-P {item_id.replace('-', '.')} 규제 문서를 찾을 수 없습니다."
return file_path.read_text(encoding="utf-8")
else:
return f"알 수 없는 리소스: {uri}"
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# 서버 실행
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
async def main():
"""MCP 서버를 시작합니다."""
async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
await server.run(
read_stream,
write_stream,
server.create_initialization_options()
)
def run():
"""동기 진입점"""
asyncio.run(main())
if __name__ == "__main__":
run()