"""
EKS Log Collector 분석용 MCP 서버
실제 EKS log collector tar 구조 기반:
- kernel/: dmesg, uname
- system/: instance-id, ps, netstat
- containerd/: config, logs
- storage/: mounts, lsblk
- var_log/aws-routed-eni/: VPC-CNI 로그
- networking/: iptables, conntrack
- ipamd/: ENI, pod 정보 (JSON)
- kubelet/: kubelet.log, kubeconfig
- cni/: CNI 설정
"""
import os
import tarfile
import json
import asyncio
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent, Resource, Prompt, PromptMessage, TextContent as PromptTextContent
server = Server("eks-log-analyzer")
# 트러블슈팅 가이드 정의
TROUBLESHOOTING_GUIDES = {
"vpc-cni": {
"name": "VPC CNI 트러블슈팅 가이드",
"description": "AWS VPC CNI 문제 분석 - IP 할당, iptables, ENI, SNAT 등",
"file": "vpc-cni.md"
},
"dns": {
"name": "DNS 트러블슈팅 가이드",
"description": "DNS 문제 분석 - resolv.conf, CoreDNS, NodeLocalDNS, iptables 규칙",
"file": "dns.md"
},
"node-join": {
"name": "Node Join 트러블슈팅 가이드",
"description": "노드 조인/NotReady 문제 - kubelet, nodeadm, RBAC, VPC CNI 에러",
"file": "node-join.md"
},
"kubelet": {
"name": "Kubelet 트러블슈팅 가이드",
"description": "Kubelet 문제 분석 - PLEG, node status, pod lifecycle",
"file": "kubelet.md"
},
"kube-proxy": {
"name": "Kube-proxy 트러블슈팅 가이드",
"description": "Kube-proxy 문제 분석 - Service 라우팅, iptables/ipvs 규칙",
"file": "kube-proxy.md"
},
"containerd": {
"name": "Containerd 트러블슈팅 가이드",
"description": "Container runtime 문제 분석 - 이미지 pull, 컨테이너 생성",
"file": "containerd.md"
},
"network-policy": {
"name": "Network Policy 트러블슈팅 가이드",
"description": "Network Policy 문제 분석 - eBPF 프로그램/맵, network-policy-agent",
"file": "network-policy.md"
}
}
def get_guides_dir():
"""guides 디렉토리 경로 반환 (MCP 서버 내장)"""
server_dir = os.path.dirname(os.path.abspath(__file__))
return os.path.join(server_dir, "guides")
def read_guide(guide_id):
"""가이드 파일 읽기"""
if guide_id not in TROUBLESHOOTING_GUIDES:
return None
guides_dir = get_guides_dir()
file_path = os.path.join(guides_dir, TROUBLESHOOTING_GUIDES[guide_id]["file"])
if os.path.exists(file_path):
with open(file_path, 'r', encoding='utf-8') as f:
return f.read()
return None
# ===== MCP Resources =====
@server.list_resources()
async def list_resources():
"""트러블슈팅 가이드를 MCP resources로 노출"""
resources = []
for guide_id, guide_info in TROUBLESHOOTING_GUIDES.items():
resources.append(Resource(
uri=f"eks://guides/{guide_id}",
name=guide_info["name"],
description=guide_info["description"],
mimeType="text/markdown"
))
return resources
@server.read_resource()
async def read_resource(uri: str):
"""리소스 읽기"""
if uri.startswith("eks://guides/"):
guide_id = uri.replace("eks://guides/", "")
content = read_guide(guide_id)
if content:
return content
return f"가이드를 찾을 수 없습니다: {guide_id}"
return f"알 수 없는 리소스: {uri}"
# ===== MCP Prompts =====
@server.list_prompts()
async def list_prompts():
"""트러블슈팅 워크플로우 프롬프트"""
return [
Prompt(
name="troubleshoot-vpc-cni",
description="VPC CNI 문제 분석 워크플로우",
arguments=[
{"name": "log_dir", "description": "EKS 로그 디렉토리 경로", "required": True}
]
),
Prompt(
name="troubleshoot-dns",
description="DNS 문제 분석 워크플로우",
arguments=[
{"name": "log_dir", "description": "EKS 로그 디렉토리 경로", "required": True}
]
),
Prompt(
name="troubleshoot-node-join",
description="Node Join/NotReady 문제 분석 워크플로우",
arguments=[
{"name": "log_dir", "description": "EKS 로그 디렉토리 경로", "required": True}
]
),
Prompt(
name="full-diagnosis",
description="전체 노드 진단 (VPC CNI, DNS, kubelet, containerd)",
arguments=[
{"name": "log_dir", "description": "EKS 로그 디렉토리 경로", "required": True}
]
)
]
@server.get_prompt()
async def get_prompt(name: str, arguments: dict):
"""프롬프트 내용 반환"""
log_dir = arguments.get("log_dir", "<log_dir>")
if name == "troubleshoot-vpc-cni":
guide = read_guide("vpc-cni") or ""
return {
"messages": [
PromptMessage(
role="user",
content=PromptTextContent(
type="text",
text=f"""EKS VPC CNI 문제를 분석해주세요.
로그 디렉토리: {log_dir}
분석 순서:
1. analyze_vpc_cni 도구로 VPC CNI 로그 및 ipamd 데이터 확인
2. analyze_networking 도구로 iptables, ip rule, ip route 확인
3. 아래 가이드를 참고하여 문제 진단
{guide}"""
)
)
]
}
elif name == "troubleshoot-dns":
guide = read_guide("dns") or ""
return {
"messages": [
PromptMessage(
role="user",
content=PromptTextContent(
type="text",
text=f"""EKS DNS 문제를 분석해주세요.
로그 디렉토리: {log_dir}
분석 순서:
1. analyze_dns 도구로 4단계 DNS 트러블슈팅 수행
2. 외부 ping → kubelet DNS → resolv.conf → iptables 순서로 확인
3. 아래 가이드를 참고하여 문제 진단
{guide}"""
)
)
]
}
elif name == "troubleshoot-node-join":
guide = read_guide("node-join") or ""
return {
"messages": [
PromptMessage(
role="user",
content=PromptTextContent(
type="text",
text=f"""EKS Node Join/NotReady 문제를 분석해주세요.
로그 디렉토리: {log_dir}
분석 순서:
1. analyze_node_join 도구로 kubelet.log 존재 여부 확인
2. kubelet.log 없으면 → nodeadm 로그 확인
3. kubelet.log 있으면 → "Not Authorized" 또는 "Network Not Ready" 에러 확인
4. 아래 가이드를 참고하여 문제 진단
{guide}"""
)
)
]
}
elif name == "full-diagnosis":
return {
"messages": [
PromptMessage(
role="user",
content=PromptTextContent(
type="text",
text=f"""EKS 노드 전체 진단을 수행해주세요.
로그 디렉토리: {log_dir}
진단 순서:
1. get_node_info - 노드 기본 정보 확인
2. analyze_kubelet - kubelet 상태 확인
3. analyze_containerd - container runtime 상태 확인
4. analyze_vpc_cni - VPC CNI 상태 확인
5. analyze_dns - DNS 설정 확인
6. analyze_networking - 네트워킹 규칙 확인
각 단계에서 발견된 문제를 정리하고, 권장 조치사항을 제시해주세요."""
)
)
]
}
return {"messages": []}
@server.list_tools()
async def list_tools():
return [
Tool(
name="extract_eks_logs",
description="EKS log collector tar 파일을 압축 해제합니다",
inputSchema={
"type": "object",
"properties": {
"tar_path": {"type": "string", "description": "tar 파일 경로"}
},
"required": ["tar_path"]
}
),
Tool(
name="list_structure",
description="압축 해제된 EKS 로그 디렉토리 구조를 보여줍니다",
inputSchema={
"type": "object",
"properties": {
"log_dir": {"type": "string", "description": "로그 디렉토리 경로"},
"depth": {"type": "integer", "description": "깊이 (기본: 2)", "default": 2}
},
"required": ["log_dir"]
}
),
Tool(
name="analyze_vpc_cni",
description="VPC-CNI 로그 분석 (var_log/aws-routed-eni/, ipamd/)",
inputSchema={
"type": "object",
"properties": {
"log_dir": {"type": "string", "description": "로그 디렉토리 경로"},
"keyword": {"type": "string", "description": "필터 키워드 (선택)"}
},
"required": ["log_dir"]
}
),
Tool(
name="analyze_node_join",
description="Node Join/NotReady 트러블슈팅 (kubelet 로그, nodeadm, RBAC, VPC CNI 에러 확인)",
inputSchema={
"type": "object",
"properties": {
"log_dir": {"type": "string", "description": "로그 디렉토리 경로"}
},
"required": ["log_dir"]
}
),
Tool(
name="analyze_kubelet",
description="kubelet 로그 및 설정 분석",
inputSchema={
"type": "object",
"properties": {
"log_dir": {"type": "string", "description": "로그 디렉토리 경로"},
"keyword": {"type": "string", "description": "필터 키워드 (선택)"}
},
"required": ["log_dir"]
}
),
Tool(
name="analyze_containerd",
description="containerd 로그 및 설정 분석",
inputSchema={
"type": "object",
"properties": {
"log_dir": {"type": "string", "description": "로그 디렉토리 경로"},
"keyword": {"type": "string", "description": "필터 키워드 (선택)"}
},
"required": ["log_dir"]
}
),
Tool(
name="analyze_storage",
description="스토리지 정보 분석 (mounts, lsblk, fstab)",
inputSchema={
"type": "object",
"properties": {
"log_dir": {"type": "string", "description": "로그 디렉토리 경로"}
},
"required": ["log_dir"]
}
),
Tool(
name="analyze_dns",
description="DNS 트러블슈팅 (외부 ping, resolv.conf, kubelet DNS, iptables DNS 규칙)",
inputSchema={
"type": "object",
"properties": {
"log_dir": {"type": "string", "description": "로그 디렉토리 경로"}
},
"required": ["log_dir"]
}
),
Tool(
name="analyze_networking",
description="네트워킹 분석 (iptables, conntrack, routes, ebpf)",
inputSchema={
"type": "object",
"properties": {
"log_dir": {"type": "string", "description": "로그 디렉토리 경로"},
"target": {
"type": "string",
"enum": ["iptables", "conntrack", "routes", "ebpf", "all"],
"description": "분석 대상 (ebpf: Network Policy eBPF 데이터)",
"default": "all"
}
},
"required": ["log_dir"]
}
),
Tool(
name="analyze_network_policy",
description="Network Policy 분석 (eBPF 프로그램/맵, network-policy-agent 로그)",
inputSchema={
"type": "object",
"properties": {
"log_dir": {"type": "string", "description": "로그 디렉토리 경로"},
"keyword": {"type": "string", "description": "필터 키워드 (선택)"}
},
"required": ["log_dir"]
}
),
Tool(
name="analyze_system",
description="시스템 정보 분석 (ps, netstat, dmesg)",
inputSchema={
"type": "object",
"properties": {
"log_dir": {"type": "string", "description": "로그 디렉토리 경로"},
"target": {
"type": "string",
"enum": ["ps", "netstat", "dmesg", "all"],
"default": "all"
}
},
"required": ["log_dir"]
}
),
Tool(
name="analyze_pod_logs",
description="특정 Pod 로그 분석 (kube-proxy, aws-node 등)",
inputSchema={
"type": "object",
"properties": {
"log_dir": {"type": "string", "description": "로그 디렉토리 경로"},
"pod_name": {"type": "string", "description": "Pod 이름 패턴 (예: kube-proxy, aws-node)"},
"keyword": {"type": "string", "description": "필터 키워드 (선택)"}
},
"required": ["log_dir", "pod_name"]
}
),
Tool(
name="search_logs",
description="전체 로그에서 패턴 검색",
inputSchema={
"type": "object",
"properties": {
"log_dir": {"type": "string", "description": "로그 디렉토리 경로"},
"pattern": {"type": "string", "description": "검색 패턴"},
"file_ext": {"type": "string", "description": "파일 확장자 (예: .log, .txt)", "default": ""}
},
"required": ["log_dir", "pattern"]
}
),
Tool(
name="get_node_info",
description="노드 기본 정보 (instance-id, region, kernel)",
inputSchema={
"type": "object",
"properties": {
"log_dir": {"type": "string", "description": "로그 디렉토리 경로"}
},
"required": ["log_dir"]
}
)
]
def read_file(path, max_lines=300):
"""파일 읽기 (라인 제한)"""
try:
with open(path, 'r', encoding='utf-8', errors='ignore') as f:
lines = f.readlines()
if len(lines) > max_lines:
return f"[{len(lines)}줄 중 처음 {max_lines}줄]\n" + "".join(lines[:max_lines])
return "".join(lines)
except Exception as e:
return f"읽기 오류: {e}"
def read_json(path):
"""JSON 파일 읽기"""
try:
with open(path, 'r') as f:
return json.dumps(json.load(f), indent=2, ensure_ascii=False)
except Exception as e:
return f"JSON 파싱 오류: {e}"
def filter_lines(content, keyword):
"""키워드로 필터링"""
if not keyword:
return content
lines = [l for l in content.split('\n') if keyword.lower() in l.lower()]
return '\n'.join(lines) if lines else f"'{keyword}' 매치 없음"
@server.call_tool()
async def call_tool(name: str, arguments: dict):
if name == "extract_eks_logs":
tar_path = os.path.expanduser(arguments["tar_path"])
if not os.path.exists(tar_path):
return [TextContent(type="text", text=f"파일 없음: {tar_path}")]
# 디렉토리명 생성 (.tar.tar, .tar 제거)
base = os.path.basename(tar_path)
for ext in ['.tar.tar', '.tar.gz', '.tgz', '.tar']:
if base.endswith(ext):
base = base[:-len(ext)]
break
extract_dir = os.path.join(os.path.dirname(tar_path), base)
try:
os.makedirs(extract_dir, exist_ok=True)
with tarfile.open(tar_path, 'r:*') as tar:
tar.extractall(extract_dir)
return [TextContent(type="text", text=f"압축 해제 완료: {extract_dir}")]
except Exception as e:
return [TextContent(type="text", text=f"오류: {e}")]
elif name == "list_structure":
log_dir = os.path.expanduser(arguments["log_dir"])
depth = arguments.get("depth", 2)
if not os.path.exists(log_dir):
return [TextContent(type="text", text=f"디렉토리 없음: {log_dir}")]
result = [f"📁 {log_dir}\n"]
def walk(path, level, prefix=""):
if level > depth:
return
try:
items = sorted(os.listdir(path))
for i, item in enumerate(items):
item_path = os.path.join(path, item)
is_last = i == len(items) - 1
conn = "└── " if is_last else "├── "
if os.path.isdir(item_path):
result.append(f"{prefix}{conn}📁 {item}/")
new_prefix = prefix + (" " if is_last else "│ ")
walk(item_path, level + 1, new_prefix)
else:
size = os.path.getsize(item_path)
sz = f"{size/1024:.1f}K" if size > 1024 else f"{size}B"
result.append(f"{prefix}{conn}📄 {item} ({sz})")
except:
pass
walk(log_dir, 1)
return [TextContent(type="text", text="\n".join(result))]
elif name == "analyze_vpc_cni":
log_dir = os.path.expanduser(arguments["log_dir"])
keyword = arguments.get("keyword", "")
result = ["=== VPC-CNI 분석 ===\n"]
# aws-routed-eni 로그
eni_dir = os.path.join(log_dir, "var_log", "aws-routed-eni")
if os.path.exists(eni_dir):
result.append("📁 var_log/aws-routed-eni/")
for f in os.listdir(eni_dir):
fpath = os.path.join(eni_dir, f)
if os.path.isfile(fpath):
result.append(f"\n--- {f} ---")
content = read_file(fpath, 150)
result.append(filter_lines(content, keyword))
# ipamd JSON 데이터
ipamd_dir = os.path.join(log_dir, "ipamd")
if os.path.exists(ipamd_dir):
result.append("\n\n📁 ipamd/ (ENI/IP 할당 정보)")
for f in ["enis.json", "pods.json", "metrics.json", "ipam.json"]:
fpath = os.path.join(ipamd_dir, f)
if os.path.exists(fpath):
result.append(f"\n--- {f} ---")
result.append(read_json(fpath))
# CNI 설정
cni_dir = os.path.join(log_dir, "cni")
if os.path.exists(cni_dir):
result.append("\n\n📁 cni/ (CNI 설정)")
for f in os.listdir(cni_dir):
fpath = os.path.join(cni_dir, f)
result.append(f"\n--- {f} ---")
result.append(read_file(fpath, 50))
return [TextContent(type="text", text="\n".join(result))]
elif name == "analyze_node_join":
log_dir = os.path.expanduser(arguments["log_dir"])
result = ["=== Node Join 트러블슈팅 ===\n"]
# Step 1: kubelet.log 존재 확인
kubelet_log = os.path.join(log_dir, "kubelet", "kubelet.log")
kubelet_exists = os.path.exists(kubelet_log)
result.append(f"📌 Step 1: kubelet.log 존재: {'✅ 있음' if kubelet_exists else '❌ 없음'}")
if not kubelet_exists:
# Step 1-1: nodeadm 로그 확인
result.append("\n\n📌 Step 1-1: nodeadm 로그 (kubelet 미실행)")
nodeadm_dir = os.path.join(log_dir, "nodeadm")
if os.path.exists(nodeadm_dir):
for f in os.listdir(nodeadm_dir):
fpath = os.path.join(nodeadm_dir, f)
result.append(f"\n--- {f} ---")
result.append(read_file(fpath, 100))
else:
result.append(" nodeadm 폴더 없음")
cloud_init = os.path.join(log_dir, "var_log", "cloud-init-output.log")
if os.path.exists(cloud_init):
result.append("\n\n--- cloud-init bootstrap ---")
content = read_file(cloud_init, 100)
err = [l for l in content.split('\n') if 'error' in l.lower() or 'fail' in l.lower()]
result.append('\n'.join(err[:20]) if err else " 에러 없음")
else:
# Step 1-2: kubelet 에러 패턴 확인
result.append("\n\n📌 Step 1-2: kubelet 에러 패턴")
content = read_file(kubelet_log, 500)
auth_err = [l for l in content.split('\n') if 'unauthorized' in l.lower() or 'not authorized' in l.lower()]
if auth_err:
result.append("\n⚠️ RBAC 에러:")
result.append('\n'.join(auth_err[:10]))
result.append(" → aws-auth ConfigMap 확인")
net_err = [l for l in content.split('\n') if 'network' in l.lower() and 'not ready' in l.lower()]
if net_err:
result.append("\n\n⚠️ Network Not Ready:")
result.append('\n'.join(net_err[:10]))
result.append(" → VPC CNI 확인 (analyze_vpc_cni)")
if not auth_err and not net_err:
result.append("\n✅ 주요 에러 없음")
curl_api = os.path.join(log_dir, "networking", "curl_api_server.txt")
if os.path.exists(curl_api):
result.append("\n\n📌 API 서버 연결:")
result.append(read_file(curl_api, 20))
return [TextContent(type="text", text="\n".join(result))]
elif name == "analyze_kubelet":
log_dir = os.path.expanduser(arguments["log_dir"])
keyword = arguments.get("keyword", "")
result = ["=== Kubelet 분석 ===\n"]
kubelet_dir = os.path.join(log_dir, "kubelet")
if os.path.exists(kubelet_dir):
# kubelet.log
log_path = os.path.join(kubelet_dir, "kubelet.log")
if os.path.exists(log_path):
result.append("� kubelet.log")
content = read_file(log_path, 200)
result.append(filter_lines(content, keyword))
# kubeconfig
config_path = os.path.join(kubelet_dir, "kubeconfig.yaml")
if os.path.exists(config_path):
result.append("\n\n📄 kubeconfig.yaml")
result.append(read_file(config_path, 50))
# kubelet service
svc_path = os.path.join(kubelet_dir, "kubelet_service.txt")
if os.path.exists(svc_path):
result.append("\n\n📄 kubelet_service.txt")
result.append(read_file(svc_path, 30))
# nodeadm 로그 (AL2023)
nodeadm_dir = os.path.join(log_dir, "nodeadm")
if os.path.exists(nodeadm_dir):
result.append("\n\n📁 nodeadm/ (AL2023)")
for f in os.listdir(nodeadm_dir):
fpath = os.path.join(nodeadm_dir, f)
result.append(f"\n--- {f} ---")
result.append(read_file(fpath, 50))
return [TextContent(type="text", text="\n".join(result))]
elif name == "analyze_containerd":
log_dir = os.path.expanduser(arguments["log_dir"])
keyword = arguments.get("keyword", "")
result = ["=== Containerd 분석 ===\n"]
cd_dir = os.path.join(log_dir, "containerd")
if os.path.exists(cd_dir):
# 로그
log_path = os.path.join(cd_dir, "containerd-log.txt")
if os.path.exists(log_path):
result.append("📄 containerd-log.txt")
content = read_file(log_path, 200)
result.append(filter_lines(content, keyword))
# 설정
config_path = os.path.join(cd_dir, "containerd-config.txt")
if os.path.exists(config_path):
result.append("\n\n📄 containerd-config.txt")
result.append(read_file(config_path, 100))
# 기타 정보
for f in ["containerd-version.txt", "containerd-images.txt", "containerd-containers.txt"]:
fpath = os.path.join(cd_dir, f)
if os.path.exists(fpath):
result.append(f"\n\n📄 {f}")
result.append(read_file(fpath, 50))
return [TextContent(type="text", text="\n".join(result))]
elif name == "analyze_storage":
log_dir = os.path.expanduser(arguments["log_dir"])
result = ["=== 스토리지 분석 ===\n"]
storage_dir = os.path.join(log_dir, "storage")
if os.path.exists(storage_dir):
for f in ["mounts.txt", "lsblk.txt", "fstab.txt", "inodes.txt", "pod_local_storage.txt"]:
fpath = os.path.join(storage_dir, f)
if os.path.exists(fpath):
result.append(f"\n📄 {f}")
result.append(read_file(fpath, 50))
return [TextContent(type="text", text="\n".join(result))]
elif name == "analyze_dns":
log_dir = os.path.expanduser(arguments["log_dir"])
result = ["=== DNS 트러블슈팅 ===\n"]
# Step 1: 외부 연결 확인
result.append("📌 Step 1: 외부 연결 확인")
net_dir = os.path.join(log_dir, "networking")
for ping_file in ["ping_amazon.com.txt", "ping_public.ecr.aws.txt"]:
fpath = os.path.join(net_dir, ping_file)
if os.path.exists(fpath):
result.append(f"\n--- {ping_file} ---")
result.append(read_file(fpath, 20))
# Step 2: Kubelet DNS 설정
result.append("\n\n📌 Step 2: Kubelet DNS 설정")
# cloud-init에서 DNS 설정 확인
cloud_init = os.path.join(log_dir, "var_log", "cloud-init-output.log")
if os.path.exists(cloud_init):
content = read_file(cloud_init, 500)
dns_lines = [l for l in content.split('\n') if 'dns' in l.lower() or 'K8S_CLUSTER_DNS' in l]
if dns_lines:
result.append("\n--- cloud-init DNS 설정 ---")
result.append('\n'.join(dns_lines[:10]))
# kubelet.log에서 cluster-dns 확인
kubelet_log = os.path.join(log_dir, "kubelet", "kubelet.log")
if os.path.exists(kubelet_log):
content = read_file(kubelet_log, 500)
dns_lines = [l for l in content.split('\n') if 'cluster-dns' in l.lower()]
if dns_lines:
result.append("\n--- kubelet cluster-dns ---")
result.append('\n'.join(dns_lines[:5]))
# Step 3: Node resolv.conf
result.append("\n\n📌 Step 3: Node resolv.conf")
resolv_path = os.path.join(net_dir, "resolv.conf")
if os.path.exists(resolv_path):
content = read_file(resolv_path, 20)
result.append(content)
# DNS 서버 분석
for line in content.split('\n'):
if line.startswith('nameserver'):
ip = line.split()[1] if len(line.split()) > 1 else ''
if ip.endswith('.2'):
result.append(f" → {ip}: VPC CIDR+2 (AmazonProvidedDNS) ✅")
elif ip.endswith('.3'):
result.append(f" → {ip}: VPC CIDR+3 (Custom DNS Resolver) ⚠️ 추가 확인 필요")
# Step 4: iptables DNS 규칙
result.append("\n\n📌 Step 4: iptables DNS 규칙 (kube-dns/CoreDNS)")
iptables_path = os.path.join(net_dir, "iptables-save.txt")
if os.path.exists(iptables_path):
content = read_file(iptables_path, 500)
dns_rules = [l for l in content.split('\n') if 'kube-dns' in l.lower() or 'dns' in l.lower()]
if dns_rules:
result.append('\n'.join(dns_rules[:15]))
# CoreDNS endpoint 확인
coredns_ips = []
for line in dns_rules:
if 'DNAT' in line and ':53' in line:
# --to-destination 10.0.x.x:53 추출
if '--to-destination' in line:
parts = line.split('--to-destination')
if len(parts) > 1:
coredns_ips.append(parts[1].strip().split()[0])
if coredns_ips:
result.append(f"\n → CoreDNS Pod IPs: {', '.join(set(coredns_ips))}")
else:
result.append(" ⚠️ kube-dns 관련 iptables 규칙 없음!")
# NodeLocalDNS 확인
if os.path.exists(iptables_path):
content = read_file(iptables_path, 500)
if '169.254.20.10' in content:
result.append("\n → NodeLocalDNS 사용 중 (169.254.20.10)")
return [TextContent(type="text", text="\n".join(result))]
elif name == "analyze_networking":
log_dir = os.path.expanduser(arguments["log_dir"])
target = arguments.get("target", "all")
result = ["=== 네트워킹 분석 ===\n"]
net_dir = os.path.join(log_dir, "networking")
if os.path.exists(net_dir):
files_map = {
"iptables": ["iptables-save.txt", "iptables-nat.txt", "iptables-filter.txt"],
"conntrack": ["conntrack.txt"],
"routes": ["iproute.txt", "iprule.txt", "ifconfig.txt"],
"ebpf": ["ebpf-data.txt", "ebpf-maps-data.txt"]
}
targets = files_map.keys() if target == "all" else [target]
for t in targets:
if t in files_map:
result.append(f"\n📁 {t.upper()}")
for f in files_map[t]:
fpath = os.path.join(net_dir, f)
if os.path.exists(fpath):
result.append(f"\n--- {f} ---")
result.append(read_file(fpath, 100))
return [TextContent(type="text", text="\n".join(result))]
elif name == "analyze_network_policy":
log_dir = os.path.expanduser(arguments["log_dir"])
keyword = arguments.get("keyword", "")
result = ["=== Network Policy 분석 ===\n"]
# eBPF 데이터
net_dir = os.path.join(log_dir, "networking")
if os.path.exists(net_dir):
result.append("📁 eBPF 프로그램/맵")
ebpf_data = os.path.join(net_dir, "ebpf-data.txt")
if os.path.exists(ebpf_data):
result.append("\n--- ebpf-data.txt (eBPF 프로그램) ---")
result.append(read_file(ebpf_data, 100))
ebpf_maps = os.path.join(net_dir, "ebpf-maps-data.txt")
if os.path.exists(ebpf_maps):
result.append("\n--- ebpf-maps-data.txt (eBPF 맵) ---")
result.append(read_file(ebpf_maps, 150))
# Network Policy Agent 로그
eni_dir = os.path.join(log_dir, "var_log", "aws-routed-eni")
if os.path.exists(eni_dir):
result.append("\n\n📁 Network Policy Agent 로그")
np_log = os.path.join(eni_dir, "network-policy-agent.log")
if os.path.exists(np_log):
result.append("\n--- network-policy-agent.log ---")
content = read_file(np_log, 200)
result.append(filter_lines(content, keyword))
# ebpf-sdk 로그도 확인
ebpf_sdk = os.path.join(eni_dir, "ebpf-sdk.log")
if os.path.exists(ebpf_sdk):
result.append("\n--- ebpf-sdk.log ---")
content = read_file(ebpf_sdk, 100)
result.append(filter_lines(content, keyword))
# 커널 버전 (eBPF 요구사항 확인용)
uname_path = os.path.join(log_dir, "kernel", "uname.txt")
if os.path.exists(uname_path):
result.append("\n\n📄 커널 버전 (eBPF 요구: 5.10+)")
result.append(read_file(uname_path, 5))
return [TextContent(type="text", text="\n".join(result))]
return [TextContent(type="text", text="\n".join(result))]
elif name == "analyze_system":
log_dir = os.path.expanduser(arguments["log_dir"])
target = arguments.get("target", "all")
result = ["=== 시스템 분석 ===\n"]
files_map = {
"ps": [("system", "ps.txt"), ("system", "top.txt")],
"netstat": [("system", "netstat.txt")],
"dmesg": [("kernel", "dmesg.current"), ("kernel", "dmesg.human.current")]
}
targets = files_map.keys() if target == "all" else [target]
for t in targets:
if t in files_map:
result.append(f"\n📁 {t.upper()}")
for subdir, fname in files_map[t]:
fpath = os.path.join(log_dir, subdir, fname)
if os.path.exists(fpath):
result.append(f"\n--- {fname} ---")
result.append(read_file(fpath, 100))
return [TextContent(type="text", text="\n".join(result))]
elif name == "analyze_pod_logs":
log_dir = os.path.expanduser(arguments["log_dir"])
pod_name = arguments["pod_name"].lower()
keyword = arguments.get("keyword", "")
result = [f"=== Pod 로그: {pod_name} ===\n"]
var_log = os.path.join(log_dir, "var_log")
if os.path.exists(var_log):
for item in os.listdir(var_log):
if pod_name in item.lower():
item_path = os.path.join(var_log, item)
if os.path.isfile(item_path):
result.append(f"\n📄 {item}")
content = read_file(item_path, 150)
result.append(filter_lines(content, keyword))
elif os.path.isdir(item_path):
result.append(f"\n📁 {item}/")
for sub in os.listdir(item_path):
sub_path = os.path.join(item_path, sub)
if os.path.isdir(sub_path):
for f in os.listdir(sub_path):
fpath = os.path.join(sub_path, f)
if os.path.isfile(fpath):
result.append(f"\n--- {sub}/{f} ---")
content = read_file(fpath, 100)
result.append(filter_lines(content, keyword))
return [TextContent(type="text", text="\n".join(result))]
elif name == "search_logs":
log_dir = os.path.expanduser(arguments["log_dir"])
pattern = arguments["pattern"].lower()
file_ext = arguments.get("file_ext", "")
result = [f"=== '{pattern}' 검색 ===\n"]
matches = 0
for root, dirs, files in os.walk(log_dir):
for f in files:
if file_ext and not f.endswith(file_ext):
continue
fpath = os.path.join(root, f)
try:
with open(fpath, 'r', encoding='utf-8', errors='ignore') as fp:
lines = fp.readlines()
found = [(i+1, l.strip()) for i, l in enumerate(lines) if pattern in l.lower()]
if found:
rel = os.path.relpath(fpath, log_dir)
result.append(f"\n📄 {rel}")
for ln, line in found[:5]:
result.append(f" L{ln}: {line[:150]}")
if len(found) > 5:
result.append(f" ... +{len(found)-5}개")
matches += len(found)
except:
continue
result.append(f"\n\n총 {matches}개 매치")
return [TextContent(type="text", text="\n".join(result))]
elif name == "get_node_info":
log_dir = os.path.expanduser(arguments["log_dir"])
result = ["=== 노드 정보 ===\n"]
info_files = [
("system", "instance-id.txt", "Instance ID"),
("system", "region.txt", "Region"),
("system", "availability-zone.txt", "AZ"),
("kernel", "uname.txt", "Kernel"),
]
for subdir, fname, label in info_files:
fpath = os.path.join(log_dir, subdir, fname)
if os.path.exists(fpath):
content = read_file(fpath, 5).strip()
result.append(f"{label}: {content}")
return [TextContent(type="text", text="\n".join(result))]
return [TextContent(type="text", text=f"알 수 없는 도구: {name}")]
async def main():
async with stdio_server() as (read, write):
await server.run(read, write, server.create_initialization_options())
if __name__ == "__main__":
asyncio.run(main())