#!/usr/bin/env python3
"""
Filesystem MCP Advanced - 고급 기능
Complete Filesystem MCP의 고급 기능들을 포함합니다.
- 디렉토리 트리 구조 조회
- 파일 이동/이름 변경
- 파일 검색
- 파일 정보 조회
- 허용된 디렉토리 목록 조회
"""
import os
import shutil
import hashlib
import mimetypes
import fnmatch
from datetime import datetime
from typing import Dict, List, Optional, Any
from dataclasses import asdict
from filesystem_base import FilesystemMCPBase, FileOperation, PermissionLevel
class FilesystemMCPAdvanced(FilesystemMCPBase):
"""파일시스템 MCP 고급 기능"""
def directory_tree(self, path: str, max_depth: int = 10, include_hidden: bool = False) -> Dict[str, Any]:
"""디렉토리 트리 구조 조회 (핵심 기능 8)"""
try:
self.logger.info(f"디렉토리 트리 조회 시작: {path}")
# Roots 프로토콜 권한 확인
root_name = self._find_root_for_path(path)
if not root_name:
error_msg = f'허용되지 않은 경로: {path}'
self.logger.warning(error_msg)
self._log_audit_event(
operation=FileOperation.READ,
path=path,
user=os.getenv('USER', 'unknown'),
success=False,
error_message=error_msg
)
return {'success': False, 'error': error_msg}
# 읽기 권한 확인
if not self._check_permission(root_name, FileOperation.READ):
error_msg = f'읽기 권한이 없습니다: {path}'
self.logger.warning(error_msg)
self._log_audit_event(
operation=FileOperation.READ,
path=path,
user=os.getenv('USER', 'unknown'),
success=False,
error_message=error_msg
)
return {'success': False, 'error': error_msg}
# 경로 존재 확인
if not os.path.exists(path):
error_msg = f'경로가 존재하지 않습니다: {path}'
self.logger.warning(error_msg)
return {'success': False, 'error': error_msg}
# 디렉토리 확인
if not os.path.isdir(path):
error_msg = f'디렉토리가 아닙니다: {path}'
self.logger.warning(error_msg)
return {'success': False, 'error': error_msg}
# 재귀적으로 트리 구조 생성
tree_data = self._build_directory_tree(path, max_depth, include_hidden, current_depth=0)
# 감사 로그 기록
self._log_audit_event(
operation=FileOperation.READ,
path=path,
user=os.getenv('USER', 'unknown'),
success=True
)
self.logger.info(f"디렉토리 트리 조회 완료: {path}")
return {
'success': True,
'path': path,
'tree': tree_data,
'max_depth': max_depth,
'include_hidden': include_hidden
}
except Exception as e:
error_msg = f'디렉토리 트리 조회 실패: {str(e)}'
self.logger.error(error_msg)
self._log_audit_event(
operation=FileOperation.READ,
path=path,
user=os.getenv('USER', 'unknown'),
success=False,
error_message=str(e)
)
return {'success': False, 'error': error_msg}
def _build_directory_tree(self, path: str, max_depth: int, include_hidden: bool, current_depth: int = 0) -> Dict[str, Any]:
"""재귀적으로 디렉토리 트리 구조 생성"""
try:
tree_node = {
'name': os.path.basename(path),
'type': 'directory',
'path': path,
'children': []
}
# 최대 깊이 확인
if current_depth >= max_depth:
tree_node['truncated'] = True
return tree_node
# 디렉토리 내용 읽기
try:
entries = os.listdir(path)
except PermissionError:
tree_node['error'] = 'Permission denied'
return tree_node
# 정렬
entries.sort()
for entry in entries:
# 숨김 파일 처리
if not include_hidden and entry.startswith('.'):
continue
entry_path = os.path.join(path, entry)
try:
if os.path.isdir(entry_path):
# 하위 디렉토리 재귀 호출
child_tree = self._build_directory_tree(
entry_path, max_depth, include_hidden, current_depth + 1
)
tree_node['children'].append(child_tree)
else:
# 파일 노드 생성
file_node = {
'name': entry,
'type': 'file',
'path': entry_path
}
# 파일 크기 추가 (선택적)
try:
stat_info = os.stat(entry_path)
file_node['size'] = stat_info.st_size
file_node['size_display'] = self._format_file_size(stat_info.st_size)
file_node['modified'] = datetime.fromtimestamp(stat_info.st_mtime).isoformat()
except (OSError, IOError):
pass
tree_node['children'].append(file_node)
except (OSError, IOError) as e:
# 접근할 수 없는 항목은 오류 노드로 추가
error_node = {
'name': entry,
'type': 'error',
'path': entry_path,
'error': str(e)
}
tree_node['children'].append(error_node)
# 통계 정보 추가
total_files = sum(1 for child in tree_node['children'] if child['type'] == 'file')
total_dirs = sum(1 for child in tree_node['children'] if child['type'] == 'directory')
tree_node['stats'] = {
'total_files': total_files,
'total_directories': total_dirs,
'total_items': len(tree_node['children'])
}
return tree_node
except Exception as e:
self.logger.error(f"트리 구조 생성 실패 {path}: {e}")
return {
'name': os.path.basename(path),
'type': 'error',
'path': path,
'error': str(e)
}
def move_file(self, source: str, destination: str, overwrite: bool = False) -> Dict[str, Any]:
"""파일 이동/이름 변경 (핵심 기능 9)"""
try:
self.logger.info(f"파일 이동 시도: {source} -> {destination}")
# 소스 경로 Roots 프로토콜 권한 확인
source_root = self._find_root_for_path(source)
if not source_root:
error_msg = f'소스 경로가 허용되지 않음: {source}'
self.logger.warning(error_msg)
self._log_audit_event(
operation=FileOperation.DELETE, # 이동은 삭제+생성으로 간주
path=source,
user=os.getenv('USER', 'unknown'),
success=False,
error_message=error_msg
)
return {'success': False, 'error': error_msg}
# 대상 경로 Roots 프로토콜 권한 확인
dest_root = self._find_root_for_path(destination)
if not dest_root:
error_msg = f'대상 경로가 허용되지 않음: {destination}'
self.logger.warning(error_msg)
return {'success': False, 'error': error_msg}
# 소스 경로 삭제 권한 확인
if not self._check_permission(source_root, FileOperation.DELETE):
error_msg = f'소스 경로 삭제 권한이 없습니다: {source}'
self.logger.warning(error_msg)
return {'success': False, 'error': error_msg}
# 대상 경로 생성 권한 확인
if not self._check_permission(dest_root, FileOperation.CREATE):
error_msg = f'대상 경로 생성 권한이 없습니다: {destination}'
self.logger.warning(error_msg)
return {'success': False, 'error': error_msg}
# 소스 파일/디렉토리 존재 확인
if not os.path.exists(source):
error_msg = f'소스가 존재하지 않습니다: {source}'
self.logger.warning(error_msg)
return {'success': False, 'error': error_msg}
# 대상 경로 처리
destination = os.path.abspath(destination)
dest_dir = os.path.dirname(destination)
# 대상 디렉토리가 존재하지 않으면 생성
if not os.path.exists(dest_dir):
try:
os.makedirs(dest_dir, exist_ok=True)
self.logger.info(f"대상 디렉토리 생성: {dest_dir}")
except Exception as e:
error_msg = f'대상 디렉토리 생성 실패: {str(e)}'
self.logger.error(error_msg)
return {'success': False, 'error': error_msg}
# 대상 파일이 이미 존재하는 경우 처리
if os.path.exists(destination):
if not overwrite:
error_msg = f'대상 파일이 이미 존재합니다 (overwrite=False): {destination}'
self.logger.warning(error_msg)
return {'success': False, 'error': error_msg}
else:
self.logger.info(f"기존 대상 파일 덮어쓰기: {destination}")
# 소스와 대상이 같은 경우 확인
if os.path.abspath(source) == destination:
error_msg = f'소스와 대상이 동일합니다: {source}'
self.logger.warning(error_msg)
return {'success': False, 'error': error_msg}
# 메타데이터 수집 (이동 전)
source_metadata = self._get_file_metadata(source)
is_directory = os.path.isdir(source)
# 원자적 이동 수행
try:
if is_directory:
# 디렉토리 이동
if os.path.exists(destination) and overwrite:
shutil.rmtree(destination)
shutil.move(source, destination)
operation_type = "directory_move"
else:
# 파일 이동
shutil.move(source, destination)
operation_type = "file_move"
# 대상 메타데이터 수집 (이동 후)
dest_metadata = self._get_file_metadata(destination)
# 감사 로그 기록 (성공)
self._log_audit_event(
operation=FileOperation.DELETE, # 소스에서 삭제
path=source,
user=os.getenv('USER', 'unknown'),
success=True
)
self._log_audit_event(
operation=FileOperation.CREATE, # 대상에서 생성
path=destination,
user=os.getenv('USER', 'unknown'),
success=True
)
self.logger.info(f"파일 이동 성공: {source} -> {destination}")
return {
'success': True,
'source': source,
'destination': destination,
'operation_type': operation_type,
'overwrite_occurred': os.path.exists(destination) and overwrite,
'source_metadata': asdict(source_metadata) if source_metadata else None,
'destination_metadata': asdict(dest_metadata) if dest_metadata else None
}
except Exception as e:
error_msg = f'파일 이동 실패: {str(e)}'
self.logger.error(error_msg)
# 감사 로그 기록 (실패)
self._log_audit_event(
operation=FileOperation.DELETE,
path=source,
user=os.getenv('USER', 'unknown'),
success=False,
error_message=str(e)
)
return {'success': False, 'error': error_msg}
except Exception as e:
error_msg = f'파일 이동 처리 실패: {str(e)}'
self.logger.error(error_msg)
return {'success': False, 'error': error_msg}
def search_files(self, path: str, pattern: str, excludePatterns: Optional[List[str]] = None) -> Dict[str, Any]:
"""파일 검색 (핵심 기능 10)"""
try:
self.logger.info(f"파일 검색 시도: {path}, 패턴: {pattern}")
# Roots 프로토콜 권한 확인
root_info = self._find_root_for_path(path)
if not root_info:
error_msg = f'경로가 허용되지 않음: {path}'
self.logger.warning(error_msg)
return {'success': False, 'error': error_msg}
# 읽기 권한 확인
if not self._check_permission(root_info, FileOperation.READ):
error_msg = f'읽기 권한이 없습니다: {path}'
self.logger.warning(error_msg)
return {'success': False, 'error': error_msg}
# 경로 존재 확인
if not os.path.exists(path):
error_msg = f'경로가 존재하지 않습니다: {path}'
self.logger.warning(error_msg)
return {'success': False, 'error': error_msg}
# 디렉토리 확인
if not os.path.isdir(path):
error_msg = f'검색 경로는 디렉토리여야 합니다: {path}'
self.logger.warning(error_msg)
return {'success': False, 'error': error_msg}
# 제외 패턴 기본값 설정
if excludePatterns is None:
excludePatterns = ['.*', '__pycache__', '*.pyc', '*.pyo', 'node_modules', '.git']
found_files = []
found_directories = []
# 재귀적 검색
for root, dirs, files in os.walk(path):
try:
# 제외 패턴에 따라 디렉토리 필터링
dirs[:] = [d for d in dirs if not any(fnmatch.fnmatch(d, pattern) for pattern in excludePatterns)]
# 파일 검색
for file in files:
# 제외 패턴 확인
if any(fnmatch.fnmatch(file, exc_pattern) for exc_pattern in excludePatterns):
continue
# 패턴 매치 확인 (대소문자 무시)
if fnmatch.fnmatch(file.lower(), pattern.lower()):
file_path = os.path.join(root, file)
relative_path = os.path.relpath(file_path, path)
# 파일 메타데이터 수집
try:
file_stat = os.stat(file_path)
found_files.append({
'path': file_path,
'relative_path': relative_path,
'name': file,
'size': file_stat.st_size,
'modified': datetime.fromtimestamp(file_stat.st_mtime).isoformat(),
'type': 'file'
})
except OSError:
# 권한 부족이나 삭제된 파일 등의 경우
continue
# 디렉토리 검색
for dir_name in dirs:
if fnmatch.fnmatch(dir_name.lower(), pattern.lower()):
dir_path = os.path.join(root, dir_name)
relative_path = os.path.relpath(dir_path, path)
# 디렉토리 메타데이터 수집
try:
dir_stat = os.stat(dir_path)
found_directories.append({
'path': dir_path,
'relative_path': relative_path,
'name': dir_name,
'modified': datetime.fromtimestamp(dir_stat.st_mtime).isoformat(),
'type': 'directory'
})
except OSError:
continue
except PermissionError:
# 권한이 없는 디렉토리는 건너뛰기
continue
# 결과 정렬 (이름순)
found_files.sort(key=lambda x: x['name'].lower())
found_directories.sort(key=lambda x: x['name'].lower())
self.logger.info(f"파일 검색 완료: {len(found_files)}개 파일, {len(found_directories)}개 디렉토리")
return {
'success': True,
'search_path': path,
'pattern': pattern,
'exclude_patterns': excludePatterns,
'files': found_files,
'directories': found_directories,
'total_files': len(found_files),
'total_directories': len(found_directories)
}
except Exception as e:
error_msg = f'파일 검색 실패: {str(e)}'
self.logger.error(error_msg)
return {'success': False, 'error': error_msg}
def get_file_info(self, path: str) -> Dict[str, Any]:
"""파일 정보 조회 (핵심 기능 11)"""
try:
self.logger.info(f"파일 정보 조회: {path}")
# Roots 프로토콜 권한 확인
root_info = self._find_root_for_path(path)
if not root_info:
error_msg = f'경로가 허용되지 않음: {path}'
self.logger.warning(error_msg)
return {'success': False, 'error': error_msg}
# 읽기 권한 확인
if not self._check_permission(root_info, FileOperation.READ):
error_msg = f'읽기 권한이 없습니다: {path}'
self.logger.warning(error_msg)
return {'success': False, 'error': error_msg}
# 경로 존재 확인
if not os.path.exists(path):
error_msg = f'경로가 존재하지 않습니다: {path}'
self.logger.warning(error_msg)
return {'success': False, 'error': error_msg}
# 파일 통계 정보 수집
stat_info = os.stat(path)
# 기본 정보
file_info = {
'success': True,
'path': path,
'name': os.path.basename(path),
'absolute_path': os.path.abspath(path),
'exists': True,
'type': 'directory' if os.path.isdir(path) else 'file',
'size': stat_info.st_size,
'created': datetime.fromtimestamp(stat_info.st_ctime).isoformat(),
'modified': datetime.fromtimestamp(stat_info.st_mtime).isoformat(),
'accessed': datetime.fromtimestamp(stat_info.st_atime).isoformat(),
'permissions': {
'readable': os.access(path, os.R_OK),
'writable': os.access(path, os.W_OK),
'executable': os.access(path, os.X_OK),
'mode': oct(stat_info.st_mode)[-3:] # rwx permissions
},
'owner': {
'uid': stat_info.st_uid,
'gid': stat_info.st_gid
}
}
# 파일별 추가 정보
if os.path.isfile(path):
# MIME 타입 추정
mime_type, encoding = mimetypes.guess_type(path)
file_info['mime_type'] = mime_type
file_info['encoding'] = encoding
# 파일 확장자
file_info['extension'] = os.path.splitext(path)[1].lower()
# 해시 계산 (작은 파일만)
if stat_info.st_size < 1024 * 1024: # 1MB 미만
try:
with open(path, 'rb') as f:
content = f.read()
file_info['checksums'] = {
'md5': hashlib.md5(content).hexdigest(),
'sha256': hashlib.sha256(content).hexdigest()
}
except Exception:
file_info['checksums'] = None
else:
file_info['checksums'] = None
# 텍스트 파일 여부 확인
try:
with open(path, 'r', encoding='utf-8') as f:
f.read(100) # 첫 100문자만 읽어서 텍스트 파일인지 확인
file_info['is_text'] = True
except (UnicodeDecodeError, PermissionError):
file_info['is_text'] = False
elif os.path.isdir(path):
# 디렉토리 내용 개수
try:
contents = os.listdir(path)
files = [f for f in contents if os.path.isfile(os.path.join(path, f))]
dirs = [d for d in contents if os.path.isdir(os.path.join(path, d))]
file_info['directory_info'] = {
'total_items': len(contents),
'file_count': len(files),
'directory_count': len(dirs),
'is_empty': len(contents) == 0
}
except PermissionError:
file_info['directory_info'] = {'error': 'Permission denied'}
# 심볼릭 링크 정보
if os.path.islink(path):
file_info['is_symlink'] = True
file_info['symlink_target'] = os.readlink(path)
file_info['symlink_target_exists'] = os.path.exists(os.readlink(path))
else:
file_info['is_symlink'] = False
self.logger.info(f"파일 정보 조회 완료: {path}")
return file_info
except Exception as e:
error_msg = f'파일 정보 조회 실패: {str(e)}'
self.logger.error(error_msg)
return {'success': False, 'error': error_msg, 'path': path}
def list_allowed_directories(self) -> Dict[str, Any]:
"""허용된 디렉토리 목록 조회 (핵심 기능 12)"""
try:
self.logger.info("허용된 디렉토리 목록 조회")
# Roots 정보를 기반으로 허용된 디렉토리 수집
allowed_dirs = []
for root_name, root_config in self.roots.items():
root_path = root_config.path
permission_level = root_config.permission_level
# 경로 존재 확인
exists = os.path.exists(root_path)
accessible = False
if exists:
try:
# 접근 가능성 확인
accessible = os.access(root_path, os.R_OK)
# 디렉토리 정보 수집
stat_info = os.stat(root_path)
dir_info = {
'root_name': root_name,
'path': root_path,
'name': os.path.basename(root_path) or root_path,
'absolute_path': os.path.abspath(root_path),
'permission_level': permission_level.value,
'exists': True,
'accessible': accessible,
'is_directory': os.path.isdir(root_path),
'created': datetime.fromtimestamp(stat_info.st_ctime).isoformat(),
'modified': datetime.fromtimestamp(stat_info.st_mtime).isoformat(),
'permissions': {
'readable': os.access(root_path, os.R_OK),
'writable': os.access(root_path, os.W_OK),
'executable': os.access(root_path, os.X_OK)
}
}
# 디렉토리인 경우 내용 개수 확인
if os.path.isdir(root_path):
try:
contents = os.listdir(root_path)
dir_info['item_count'] = len(contents)
except PermissionError:
dir_info['item_count'] = None
else:
dir_info['item_count'] = None
except OSError as e:
dir_info = {
'root_name': root_name,
'path': root_path,
'name': os.path.basename(root_path) or root_path,
'permission_level': permission_level.value,
'exists': True,
'accessible': False,
'error': f'통계 정보 수집 실패: {str(e)}'
}
else:
dir_info = {
'root_name': root_name,
'path': root_path,
'name': os.path.basename(root_path) or root_path,
'permission_level': permission_level.value,
'exists': False,
'accessible': False,
'error': '경로가 존재하지 않음'
}
allowed_dirs.append(dir_info)
# 권한 레벨별 통계
permission_stats = {}
for permission_level in PermissionLevel:
count = sum(1 for d in allowed_dirs if d.get('permission_level') == permission_level.value)
permission_stats[permission_level.value] = count
# 접근 가능한 디렉토리 통계
accessible_count = sum(1 for d in allowed_dirs if d.get('accessible', False))
existing_count = sum(1 for d in allowed_dirs if d.get('exists', False))
self.logger.info(f"허용된 디렉토리 목록 조회 완료: 총 {len(allowed_dirs)}개")
return {
'success': True,
'total_directories': len(allowed_dirs),
'accessible_directories': accessible_count,
'existing_directories': existing_count,
'permission_statistics': permission_stats,
'directories': allowed_dirs,
'timestamp': datetime.now().isoformat()
}
except Exception as e:
error_msg = f'허용된 디렉토리 목록 조회 실패: {str(e)}'
self.logger.error(error_msg)
return {'success': False, 'error': error_msg}
def _format_file_size(self, size_bytes: int) -> str:
"""파일 크기를 사람이 읽기 쉬운 형태로 변환"""
if size_bytes == 0:
return "0 B"
units = ['B', 'KB', 'MB', 'GB', 'TB']
unit_index = 0
size = float(size_bytes)
while size >= 1024 and unit_index < len(units) - 1:
size /= 1024
unit_index += 1
if unit_index == 0:
return f"{int(size)} {units[unit_index]}"
else:
return f"{size:.1f} {units[unit_index]}"