Skip to main content
Glama
Skynotdie

MCP Localization Project

by Skynotdie
filesystem_advanced.py30.2 kB
#!/usr/bin/env python3 """ Filesystem MCP Advanced - 고급 기능 Complete Filesystem MCP의 고급 기능들을 포함합니다. - 디렉토리 트리 구조 조회 - 파일 이동/이름 변경 - 파일 검색 - 파일 정보 조회 - 허용된 디렉토리 목록 조회 """ import os import shutil import hashlib import mimetypes import fnmatch from datetime import datetime from typing import Dict, List, Optional, Any from dataclasses import asdict from filesystem_base import FilesystemMCPBase, FileOperation, PermissionLevel class FilesystemMCPAdvanced(FilesystemMCPBase): """파일시스템 MCP 고급 기능""" def directory_tree(self, path: str, max_depth: int = 10, include_hidden: bool = False) -> Dict[str, Any]: """디렉토리 트리 구조 조회 (핵심 기능 8)""" try: self.logger.info(f"디렉토리 트리 조회 시작: {path}") # Roots 프로토콜 권한 확인 root_name = self._find_root_for_path(path) if not root_name: error_msg = f'허용되지 않은 경로: {path}' self.logger.warning(error_msg) self._log_audit_event( operation=FileOperation.READ, path=path, user=os.getenv('USER', 'unknown'), success=False, error_message=error_msg ) return {'success': False, 'error': error_msg} # 읽기 권한 확인 if not self._check_permission(root_name, FileOperation.READ): error_msg = f'읽기 권한이 없습니다: {path}' self.logger.warning(error_msg) self._log_audit_event( operation=FileOperation.READ, path=path, user=os.getenv('USER', 'unknown'), success=False, error_message=error_msg ) return {'success': False, 'error': error_msg} # 경로 존재 확인 if not os.path.exists(path): error_msg = f'경로가 존재하지 않습니다: {path}' self.logger.warning(error_msg) return {'success': False, 'error': error_msg} # 디렉토리 확인 if not os.path.isdir(path): error_msg = f'디렉토리가 아닙니다: {path}' self.logger.warning(error_msg) return {'success': False, 'error': error_msg} # 재귀적으로 트리 구조 생성 tree_data = self._build_directory_tree(path, max_depth, include_hidden, current_depth=0) # 감사 로그 기록 self._log_audit_event( operation=FileOperation.READ, path=path, user=os.getenv('USER', 'unknown'), success=True ) self.logger.info(f"디렉토리 트리 조회 완료: {path}") return { 'success': True, 'path': path, 'tree': tree_data, 'max_depth': max_depth, 'include_hidden': include_hidden } except Exception as e: error_msg = f'디렉토리 트리 조회 실패: {str(e)}' self.logger.error(error_msg) self._log_audit_event( operation=FileOperation.READ, path=path, user=os.getenv('USER', 'unknown'), success=False, error_message=str(e) ) return {'success': False, 'error': error_msg} def _build_directory_tree(self, path: str, max_depth: int, include_hidden: bool, current_depth: int = 0) -> Dict[str, Any]: """재귀적으로 디렉토리 트리 구조 생성""" try: tree_node = { 'name': os.path.basename(path), 'type': 'directory', 'path': path, 'children': [] } # 최대 깊이 확인 if current_depth >= max_depth: tree_node['truncated'] = True return tree_node # 디렉토리 내용 읽기 try: entries = os.listdir(path) except PermissionError: tree_node['error'] = 'Permission denied' return tree_node # 정렬 entries.sort() for entry in entries: # 숨김 파일 처리 if not include_hidden and entry.startswith('.'): continue entry_path = os.path.join(path, entry) try: if os.path.isdir(entry_path): # 하위 디렉토리 재귀 호출 child_tree = self._build_directory_tree( entry_path, max_depth, include_hidden, current_depth + 1 ) tree_node['children'].append(child_tree) else: # 파일 노드 생성 file_node = { 'name': entry, 'type': 'file', 'path': entry_path } # 파일 크기 추가 (선택적) try: stat_info = os.stat(entry_path) file_node['size'] = stat_info.st_size file_node['size_display'] = self._format_file_size(stat_info.st_size) file_node['modified'] = datetime.fromtimestamp(stat_info.st_mtime).isoformat() except (OSError, IOError): pass tree_node['children'].append(file_node) except (OSError, IOError) as e: # 접근할 수 없는 항목은 오류 노드로 추가 error_node = { 'name': entry, 'type': 'error', 'path': entry_path, 'error': str(e) } tree_node['children'].append(error_node) # 통계 정보 추가 total_files = sum(1 for child in tree_node['children'] if child['type'] == 'file') total_dirs = sum(1 for child in tree_node['children'] if child['type'] == 'directory') tree_node['stats'] = { 'total_files': total_files, 'total_directories': total_dirs, 'total_items': len(tree_node['children']) } return tree_node except Exception as e: self.logger.error(f"트리 구조 생성 실패 {path}: {e}") return { 'name': os.path.basename(path), 'type': 'error', 'path': path, 'error': str(e) } def move_file(self, source: str, destination: str, overwrite: bool = False) -> Dict[str, Any]: """파일 이동/이름 변경 (핵심 기능 9)""" try: self.logger.info(f"파일 이동 시도: {source} -> {destination}") # 소스 경로 Roots 프로토콜 권한 확인 source_root = self._find_root_for_path(source) if not source_root: error_msg = f'소스 경로가 허용되지 않음: {source}' self.logger.warning(error_msg) self._log_audit_event( operation=FileOperation.DELETE, # 이동은 삭제+생성으로 간주 path=source, user=os.getenv('USER', 'unknown'), success=False, error_message=error_msg ) return {'success': False, 'error': error_msg} # 대상 경로 Roots 프로토콜 권한 확인 dest_root = self._find_root_for_path(destination) if not dest_root: error_msg = f'대상 경로가 허용되지 않음: {destination}' self.logger.warning(error_msg) return {'success': False, 'error': error_msg} # 소스 경로 삭제 권한 확인 if not self._check_permission(source_root, FileOperation.DELETE): error_msg = f'소스 경로 삭제 권한이 없습니다: {source}' self.logger.warning(error_msg) return {'success': False, 'error': error_msg} # 대상 경로 생성 권한 확인 if not self._check_permission(dest_root, FileOperation.CREATE): error_msg = f'대상 경로 생성 권한이 없습니다: {destination}' self.logger.warning(error_msg) return {'success': False, 'error': error_msg} # 소스 파일/디렉토리 존재 확인 if not os.path.exists(source): error_msg = f'소스가 존재하지 않습니다: {source}' self.logger.warning(error_msg) return {'success': False, 'error': error_msg} # 대상 경로 처리 destination = os.path.abspath(destination) dest_dir = os.path.dirname(destination) # 대상 디렉토리가 존재하지 않으면 생성 if not os.path.exists(dest_dir): try: os.makedirs(dest_dir, exist_ok=True) self.logger.info(f"대상 디렉토리 생성: {dest_dir}") except Exception as e: error_msg = f'대상 디렉토리 생성 실패: {str(e)}' self.logger.error(error_msg) return {'success': False, 'error': error_msg} # 대상 파일이 이미 존재하는 경우 처리 if os.path.exists(destination): if not overwrite: error_msg = f'대상 파일이 이미 존재합니다 (overwrite=False): {destination}' self.logger.warning(error_msg) return {'success': False, 'error': error_msg} else: self.logger.info(f"기존 대상 파일 덮어쓰기: {destination}") # 소스와 대상이 같은 경우 확인 if os.path.abspath(source) == destination: error_msg = f'소스와 대상이 동일합니다: {source}' self.logger.warning(error_msg) return {'success': False, 'error': error_msg} # 메타데이터 수집 (이동 전) source_metadata = self._get_file_metadata(source) is_directory = os.path.isdir(source) # 원자적 이동 수행 try: if is_directory: # 디렉토리 이동 if os.path.exists(destination) and overwrite: shutil.rmtree(destination) shutil.move(source, destination) operation_type = "directory_move" else: # 파일 이동 shutil.move(source, destination) operation_type = "file_move" # 대상 메타데이터 수집 (이동 후) dest_metadata = self._get_file_metadata(destination) # 감사 로그 기록 (성공) self._log_audit_event( operation=FileOperation.DELETE, # 소스에서 삭제 path=source, user=os.getenv('USER', 'unknown'), success=True ) self._log_audit_event( operation=FileOperation.CREATE, # 대상에서 생성 path=destination, user=os.getenv('USER', 'unknown'), success=True ) self.logger.info(f"파일 이동 성공: {source} -> {destination}") return { 'success': True, 'source': source, 'destination': destination, 'operation_type': operation_type, 'overwrite_occurred': os.path.exists(destination) and overwrite, 'source_metadata': asdict(source_metadata) if source_metadata else None, 'destination_metadata': asdict(dest_metadata) if dest_metadata else None } except Exception as e: error_msg = f'파일 이동 실패: {str(e)}' self.logger.error(error_msg) # 감사 로그 기록 (실패) self._log_audit_event( operation=FileOperation.DELETE, path=source, user=os.getenv('USER', 'unknown'), success=False, error_message=str(e) ) return {'success': False, 'error': error_msg} except Exception as e: error_msg = f'파일 이동 처리 실패: {str(e)}' self.logger.error(error_msg) return {'success': False, 'error': error_msg} def search_files(self, path: str, pattern: str, excludePatterns: Optional[List[str]] = None) -> Dict[str, Any]: """파일 검색 (핵심 기능 10)""" try: self.logger.info(f"파일 검색 시도: {path}, 패턴: {pattern}") # Roots 프로토콜 권한 확인 root_info = self._find_root_for_path(path) if not root_info: error_msg = f'경로가 허용되지 않음: {path}' self.logger.warning(error_msg) return {'success': False, 'error': error_msg} # 읽기 권한 확인 if not self._check_permission(root_info, FileOperation.READ): error_msg = f'읽기 권한이 없습니다: {path}' self.logger.warning(error_msg) return {'success': False, 'error': error_msg} # 경로 존재 확인 if not os.path.exists(path): error_msg = f'경로가 존재하지 않습니다: {path}' self.logger.warning(error_msg) return {'success': False, 'error': error_msg} # 디렉토리 확인 if not os.path.isdir(path): error_msg = f'검색 경로는 디렉토리여야 합니다: {path}' self.logger.warning(error_msg) return {'success': False, 'error': error_msg} # 제외 패턴 기본값 설정 if excludePatterns is None: excludePatterns = ['.*', '__pycache__', '*.pyc', '*.pyo', 'node_modules', '.git'] found_files = [] found_directories = [] # 재귀적 검색 for root, dirs, files in os.walk(path): try: # 제외 패턴에 따라 디렉토리 필터링 dirs[:] = [d for d in dirs if not any(fnmatch.fnmatch(d, pattern) for pattern in excludePatterns)] # 파일 검색 for file in files: # 제외 패턴 확인 if any(fnmatch.fnmatch(file, exc_pattern) for exc_pattern in excludePatterns): continue # 패턴 매치 확인 (대소문자 무시) if fnmatch.fnmatch(file.lower(), pattern.lower()): file_path = os.path.join(root, file) relative_path = os.path.relpath(file_path, path) # 파일 메타데이터 수집 try: file_stat = os.stat(file_path) found_files.append({ 'path': file_path, 'relative_path': relative_path, 'name': file, 'size': file_stat.st_size, 'modified': datetime.fromtimestamp(file_stat.st_mtime).isoformat(), 'type': 'file' }) except OSError: # 권한 부족이나 삭제된 파일 등의 경우 continue # 디렉토리 검색 for dir_name in dirs: if fnmatch.fnmatch(dir_name.lower(), pattern.lower()): dir_path = os.path.join(root, dir_name) relative_path = os.path.relpath(dir_path, path) # 디렉토리 메타데이터 수집 try: dir_stat = os.stat(dir_path) found_directories.append({ 'path': dir_path, 'relative_path': relative_path, 'name': dir_name, 'modified': datetime.fromtimestamp(dir_stat.st_mtime).isoformat(), 'type': 'directory' }) except OSError: continue except PermissionError: # 권한이 없는 디렉토리는 건너뛰기 continue # 결과 정렬 (이름순) found_files.sort(key=lambda x: x['name'].lower()) found_directories.sort(key=lambda x: x['name'].lower()) self.logger.info(f"파일 검색 완료: {len(found_files)}개 파일, {len(found_directories)}개 디렉토리") return { 'success': True, 'search_path': path, 'pattern': pattern, 'exclude_patterns': excludePatterns, 'files': found_files, 'directories': found_directories, 'total_files': len(found_files), 'total_directories': len(found_directories) } except Exception as e: error_msg = f'파일 검색 실패: {str(e)}' self.logger.error(error_msg) return {'success': False, 'error': error_msg} def get_file_info(self, path: str) -> Dict[str, Any]: """파일 정보 조회 (핵심 기능 11)""" try: self.logger.info(f"파일 정보 조회: {path}") # Roots 프로토콜 권한 확인 root_info = self._find_root_for_path(path) if not root_info: error_msg = f'경로가 허용되지 않음: {path}' self.logger.warning(error_msg) return {'success': False, 'error': error_msg} # 읽기 권한 확인 if not self._check_permission(root_info, FileOperation.READ): error_msg = f'읽기 권한이 없습니다: {path}' self.logger.warning(error_msg) return {'success': False, 'error': error_msg} # 경로 존재 확인 if not os.path.exists(path): error_msg = f'경로가 존재하지 않습니다: {path}' self.logger.warning(error_msg) return {'success': False, 'error': error_msg} # 파일 통계 정보 수집 stat_info = os.stat(path) # 기본 정보 file_info = { 'success': True, 'path': path, 'name': os.path.basename(path), 'absolute_path': os.path.abspath(path), 'exists': True, 'type': 'directory' if os.path.isdir(path) else 'file', 'size': stat_info.st_size, 'created': datetime.fromtimestamp(stat_info.st_ctime).isoformat(), 'modified': datetime.fromtimestamp(stat_info.st_mtime).isoformat(), 'accessed': datetime.fromtimestamp(stat_info.st_atime).isoformat(), 'permissions': { 'readable': os.access(path, os.R_OK), 'writable': os.access(path, os.W_OK), 'executable': os.access(path, os.X_OK), 'mode': oct(stat_info.st_mode)[-3:] # rwx permissions }, 'owner': { 'uid': stat_info.st_uid, 'gid': stat_info.st_gid } } # 파일별 추가 정보 if os.path.isfile(path): # MIME 타입 추정 mime_type, encoding = mimetypes.guess_type(path) file_info['mime_type'] = mime_type file_info['encoding'] = encoding # 파일 확장자 file_info['extension'] = os.path.splitext(path)[1].lower() # 해시 계산 (작은 파일만) if stat_info.st_size < 1024 * 1024: # 1MB 미만 try: with open(path, 'rb') as f: content = f.read() file_info['checksums'] = { 'md5': hashlib.md5(content).hexdigest(), 'sha256': hashlib.sha256(content).hexdigest() } except Exception: file_info['checksums'] = None else: file_info['checksums'] = None # 텍스트 파일 여부 확인 try: with open(path, 'r', encoding='utf-8') as f: f.read(100) # 첫 100문자만 읽어서 텍스트 파일인지 확인 file_info['is_text'] = True except (UnicodeDecodeError, PermissionError): file_info['is_text'] = False elif os.path.isdir(path): # 디렉토리 내용 개수 try: contents = os.listdir(path) files = [f for f in contents if os.path.isfile(os.path.join(path, f))] dirs = [d for d in contents if os.path.isdir(os.path.join(path, d))] file_info['directory_info'] = { 'total_items': len(contents), 'file_count': len(files), 'directory_count': len(dirs), 'is_empty': len(contents) == 0 } except PermissionError: file_info['directory_info'] = {'error': 'Permission denied'} # 심볼릭 링크 정보 if os.path.islink(path): file_info['is_symlink'] = True file_info['symlink_target'] = os.readlink(path) file_info['symlink_target_exists'] = os.path.exists(os.readlink(path)) else: file_info['is_symlink'] = False self.logger.info(f"파일 정보 조회 완료: {path}") return file_info except Exception as e: error_msg = f'파일 정보 조회 실패: {str(e)}' self.logger.error(error_msg) return {'success': False, 'error': error_msg, 'path': path} def list_allowed_directories(self) -> Dict[str, Any]: """허용된 디렉토리 목록 조회 (핵심 기능 12)""" try: self.logger.info("허용된 디렉토리 목록 조회") # Roots 정보를 기반으로 허용된 디렉토리 수집 allowed_dirs = [] for root_name, root_config in self.roots.items(): root_path = root_config.path permission_level = root_config.permission_level # 경로 존재 확인 exists = os.path.exists(root_path) accessible = False if exists: try: # 접근 가능성 확인 accessible = os.access(root_path, os.R_OK) # 디렉토리 정보 수집 stat_info = os.stat(root_path) dir_info = { 'root_name': root_name, 'path': root_path, 'name': os.path.basename(root_path) or root_path, 'absolute_path': os.path.abspath(root_path), 'permission_level': permission_level.value, 'exists': True, 'accessible': accessible, 'is_directory': os.path.isdir(root_path), 'created': datetime.fromtimestamp(stat_info.st_ctime).isoformat(), 'modified': datetime.fromtimestamp(stat_info.st_mtime).isoformat(), 'permissions': { 'readable': os.access(root_path, os.R_OK), 'writable': os.access(root_path, os.W_OK), 'executable': os.access(root_path, os.X_OK) } } # 디렉토리인 경우 내용 개수 확인 if os.path.isdir(root_path): try: contents = os.listdir(root_path) dir_info['item_count'] = len(contents) except PermissionError: dir_info['item_count'] = None else: dir_info['item_count'] = None except OSError as e: dir_info = { 'root_name': root_name, 'path': root_path, 'name': os.path.basename(root_path) or root_path, 'permission_level': permission_level.value, 'exists': True, 'accessible': False, 'error': f'통계 정보 수집 실패: {str(e)}' } else: dir_info = { 'root_name': root_name, 'path': root_path, 'name': os.path.basename(root_path) or root_path, 'permission_level': permission_level.value, 'exists': False, 'accessible': False, 'error': '경로가 존재하지 않음' } allowed_dirs.append(dir_info) # 권한 레벨별 통계 permission_stats = {} for permission_level in PermissionLevel: count = sum(1 for d in allowed_dirs if d.get('permission_level') == permission_level.value) permission_stats[permission_level.value] = count # 접근 가능한 디렉토리 통계 accessible_count = sum(1 for d in allowed_dirs if d.get('accessible', False)) existing_count = sum(1 for d in allowed_dirs if d.get('exists', False)) self.logger.info(f"허용된 디렉토리 목록 조회 완료: 총 {len(allowed_dirs)}개") return { 'success': True, 'total_directories': len(allowed_dirs), 'accessible_directories': accessible_count, 'existing_directories': existing_count, 'permission_statistics': permission_stats, 'directories': allowed_dirs, 'timestamp': datetime.now().isoformat() } except Exception as e: error_msg = f'허용된 디렉토리 목록 조회 실패: {str(e)}' self.logger.error(error_msg) return {'success': False, 'error': error_msg} def _format_file_size(self, size_bytes: int) -> str: """파일 크기를 사람이 읽기 쉬운 형태로 변환""" if size_bytes == 0: return "0 B" units = ['B', 'KB', 'MB', 'GB', 'TB'] unit_index = 0 size = float(size_bytes) while size >= 1024 and unit_index < len(units) - 1: size /= 1024 unit_index += 1 if unit_index == 0: return f"{int(size)} {units[unit_index]}" else: return f"{size:.1f} {units[unit_index]}"

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Skynotdie/mky'

If you have feedback or need assistance with the MCP directory API, please join our Discord server