Skip to main content
Glama
Skynotdie

MCP Localization Project

by Skynotdie
complete_filesystem_mcp.py79.8 kB
#!/usr/bin/env python3 """ Complete Filesystem MCP - 완전한 파일시스템 MCP 도구 mcp_principles/02-1_filesystem_mcp.md 기반 완전 구현 - 12개 핵심 기능 모두 구현 - Roots 프로토콜 완전 지원 - 원자적 쓰기 및 메타데이터 관리 - CLAUDE.md 지침 완전 반영 """ import os import sys import sqlite3 import json import hashlib import mimetypes import tempfile import shutil import threading import time import logging from datetime import datetime from pathlib import Path from typing import Dict, List, Optional, Any, Tuple, Union from dataclasses import dataclass, asdict from enum import Enum import glob import fnmatch # 로컬 모델 임포트 from .models import ( PermissionLevel, FileOperation, RootConfig, FileMetadata, AuditEvent ) # 로깅 설정 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) class CompleteFilesystemMCP: """완전한 파일시스템 MCP 구현""" def __init__(self, db_path: Optional[str] = None): """초기화""" # 로거 먼저 설정 self.logger = logging.getLogger("CompleteFilesystemMCP") # DB 경로 설정 self.db_path = db_path or tempfile.mktemp(suffix='.db') # Roots 설정 self.roots: Dict[str, RootConfig] = {} # 감사 로그 저장소 self.audit_log: List[AuditEvent] = [] # 스레드 안전성을 위한 락 self.file_lock = threading.RLock() # GPU 환경변수 설정 (CLAUDE.md 지침) self._setup_gpu_environment() # 데이터베이스 초기화 self._init_database() # 기본 루트 추가 self._setup_default_roots() self.logger.info(f"CompleteFilesystemMCP 초기화 완료: DB={self.db_path}") def _setup_gpu_environment(self): """GPU 환경변수 설정 (CLAUDE.md 지침 반영)""" try: os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2' # 경고 숨김 os.environ['TF_FORCE_GPU_ALLOW_GROWTH'] = 'true' # 메모리 증분 할당 os.environ['CUDA_VISIBLE_DEVICES'] = '0' # GPU 0 사용 os.environ['TF_GPU_ALLOCATOR'] = 'cuda_malloc_async' self.logger.debug("GPU 환경변수 설정 완료") except Exception as e: self.logger.warning(f"GPU 환경변수 설정 실패: {e}") def _init_database(self): """데이터베이스 초기화""" try: with self.file_lock: conn = sqlite3.connect(self.db_path) cursor = conn.cursor() # Roots 테이블 cursor.execute(''' CREATE TABLE IF NOT EXISTS roots ( name TEXT PRIMARY KEY, path TEXT NOT NULL, permission_level TEXT NOT NULL, allowed_operations TEXT NOT NULL, config TEXT NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') # 파일 메타데이터 테이블 cursor.execute(''' CREATE TABLE IF NOT EXISTS file_metadata ( path TEXT PRIMARY KEY, name TEXT NOT NULL, size INTEGER NOT NULL, modified_time TIMESTAMP NOT NULL, created_time TIMESTAMP NOT NULL, mime_type TEXT NOT NULL, permissions TEXT NOT NULL, file_hash TEXT NOT NULL, is_directory BOOLEAN NOT NULL, is_symlink BOOLEAN NOT NULL, symlink_target TEXT, last_accessed TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') # 감사 로그 테이블 cursor.execute(''' CREATE TABLE IF NOT EXISTS audit_log ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp TIMESTAMP NOT NULL, operation TEXT NOT NULL, path TEXT NOT NULL, user TEXT NOT NULL, success BOOLEAN NOT NULL, file_size INTEGER, error_message TEXT, details TEXT ) ''') conn.commit() conn.close() self.logger.info("데이터베이스 초기화 완료") except Exception as e: self.logger.error(f"데이터베이스 초기화 실패: {e}") raise def _setup_default_roots(self): """기본 Root 설정""" try: # 현재 작업 디렉토리 current_dir = os.getcwd() default_root = RootConfig( name="current_directory", path=current_dir, permission_level=PermissionLevel.READ_WRITE, allowed_operations=[ FileOperation.READ, FileOperation.WRITE, FileOperation.CREATE, FileOperation.DELETE, FileOperation.MOVE, FileOperation.SEARCH ], max_file_size=50 * 1024 * 1024, # 50MB audit_enabled=True ) self.add_root(default_root) # 홈 디렉토리 (읽기 전용) home_dir = os.path.expanduser("~") if home_dir != current_dir: home_root = RootConfig( name="home_directory", path=home_dir, permission_level=PermissionLevel.READ_ONLY, allowed_operations=[ FileOperation.READ, FileOperation.SEARCH ], max_file_size=10 * 1024 * 1024, # 10MB audit_enabled=True ) self.add_root(home_root) except Exception as e: self.logger.warning(f"기본 Root 설정 실패: {e}") def add_root(self, config: RootConfig) -> bool: """새로운 Root 경로 추가""" try: self.logger.info(f"Root 추가 시도: {config.name} -> {config.path}") # 경로 존재 확인 if not os.path.exists(config.path): self.logger.error(f"경로가 존재하지 않습니다: {config.path}") return False # 절대 경로로 변환 config.path = os.path.abspath(config.path) # 권한 확인 if not os.access(config.path, os.R_OK): self.logger.error(f"읽기 권한이 없습니다: {config.path}") return False with self.file_lock: # 데이터베이스에 저장 conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(''' INSERT OR REPLACE INTO roots (name, path, permission_level, allowed_operations, config) VALUES (?, ?, ?, ?, ?) ''', ( config.name, config.path, config.permission_level.value, json.dumps([op.value for op in config.allowed_operations]), json.dumps(config.to_dict()) )) conn.commit() conn.close() # 메모리에 저장 self.roots[config.name] = config self.logger.info(f"Root '{config.name}' 추가 완료: {config.path}") return True except Exception as e: self.logger.error(f"Root 추가 실패: {e}") return False def _calculate_file_hash(self, filepath: str) -> str: """파일 해시 계산""" try: hasher = hashlib.sha256() with open(filepath, 'rb') as f: for chunk in iter(lambda: f.read(4096), b""): hasher.update(chunk) return hasher.hexdigest() except Exception as e: self.logger.error(f"파일 해시 계산 실패 {filepath}: {e}") return "" def _get_file_metadata(self, filepath: str) -> Optional[FileMetadata]: """파일 메타데이터 수집""" try: path_obj = Path(filepath) if not path_obj.exists(): return None stat_info = path_obj.stat() # 파일 해시 계산 (디렉토리가 아닌 경우만) file_hash = "" if path_obj.is_file(): file_hash = self._calculate_file_hash(filepath) # MIME 타입 감지 mime_type, _ = mimetypes.guess_type(filepath) if not mime_type: mime_type = "application/octet-stream" if path_obj.is_file() else "inode/directory" # 심볼릭 링크 대상 symlink_target = None if path_obj.is_symlink(): try: symlink_target = str(path_obj.readlink()) except: symlink_target = None return FileMetadata( path=filepath, name=path_obj.name, size=stat_info.st_size, modified_time=datetime.fromtimestamp(stat_info.st_mtime), created_time=datetime.fromtimestamp(stat_info.st_ctime), mime_type=mime_type, permissions=oct(stat_info.st_mode)[-3:], file_hash=file_hash, is_directory=path_obj.is_dir(), is_symlink=path_obj.is_symlink(), symlink_target=symlink_target ) except Exception as e: self.logger.error(f"파일 메타데이터 수집 실패 {filepath}: {e}") return None def _log_audit_event(self, operation: FileOperation, path: str, user: str, success: bool, file_size: Optional[int] = None, error_message: Optional[str] = None, details: Optional[Dict] = None): """감사 이벤트 로깅""" try: event = AuditEvent( timestamp=datetime.now(), operation=operation, path=path, user=user, success=success, file_size=file_size, error_message=error_message, details=details ) # 메모리에 저장 (제한된 크기) self.audit_log.append(event) if len(self.audit_log) > 1000: # 최대 1000개 유지 self.audit_log.pop(0) # 데이터베이스에 저장 with self.file_lock: conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(''' INSERT INTO audit_log (timestamp, operation, path, user, success, file_size, error_message, details) VALUES (?, ?, ?, ?, ?, ?, ?, ?) ''', ( event.timestamp, event.operation.value, event.path, event.user, event.success, event.file_size, event.error_message, json.dumps(event.details) if event.details else None )) conn.commit() conn.close() self.logger.debug(f"감사 이벤트 로깅: {operation.value} {path} {'성공' if success else '실패'}") except Exception as e: self.logger.error(f"감사 이벤트 로깅 실패: {e}") def _check_permission(self, root_name: str, operation: FileOperation) -> bool: """권한 확인""" if root_name not in self.roots: self.logger.debug(f"Root '{root_name}' 찾을 수 없음") return False config = self.roots[root_name] has_permission = operation in config.allowed_operations self.logger.debug(f"권한 확인: {root_name} {operation.value} -> {has_permission}") return has_permission def _find_root_for_path(self, path: str) -> Optional[str]: """경로에 해당하는 Root 찾기""" abs_path = os.path.abspath(path) self.logger.debug(f"Root 찾기: {path} -> {abs_path}") # 가장 구체적인 매치 찾기 (가장 긴 공통 경로) best_match = None best_match_length = 0 for root_name, config in self.roots.items(): self.logger.debug(f" 확인: {config.path} 하위인가?") if abs_path.startswith(config.path): match_length = len(config.path) if match_length > best_match_length: best_match = root_name best_match_length = match_length self.logger.debug(f" 새로운 최적 매치: {root_name} (길이: {match_length})") if best_match: self.logger.debug(f" 최종 매치: {best_match}") else: self.logger.debug(" 매치되는 Root 없음") return best_match def _check_file_size_limit(self, filepath: str, root_name: str) -> bool: """파일 크기 제한 확인""" try: if not os.path.exists(filepath): return True # 새 파일은 크기 제한 없음 file_size = os.path.getsize(filepath) max_size = self.roots[root_name].max_file_size if file_size > max_size: self.logger.warning(f"파일 크기 제한 초과: {file_size} > {max_size}") return False return True except Exception as e: self.logger.error(f"파일 크기 확인 실패: {e}") return False # ======================================== # 12개 핵심 기능 구현 # ======================================== def filesystem__read_file(self, path: str, encoding: str = 'utf-8', head: Optional[int] = None, tail: Optional[int] = None) -> Dict[str, Any]: """파일 읽기 (핵심 기능 1)""" try: self.logger.info(f"파일 읽기 시도: {path}") # 절대 경로로 변환 abs_path = os.path.abspath(path) # Root 찾기 및 권한 확인 root_name = self._find_root_for_path(abs_path) if not root_name: error_msg = f'허용되지 않은 경로: {abs_path}' self._log_audit_event(FileOperation.READ, abs_path, os.getenv('USER', 'unknown'), False, error_message=error_msg) return {'success': False, 'error': error_msg} if not self._check_permission(root_name, FileOperation.READ): error_msg = f'읽기 권한이 없습니다: {abs_path}' self._log_audit_event(FileOperation.READ, abs_path, os.getenv('USER', 'unknown'), False, error_message=error_msg) return {'success': False, 'error': error_msg} # 파일 존재 확인 if not os.path.exists(abs_path): error_msg = f'파일이 존재하지 않습니다: {abs_path}' self._log_audit_event(FileOperation.READ, abs_path, os.getenv('USER', 'unknown'), False, error_message=error_msg) return {'success': False, 'error': error_msg} if not os.path.isfile(abs_path): error_msg = f'디렉토리입니다: {abs_path}' self._log_audit_event(FileOperation.READ, abs_path, os.getenv('USER', 'unknown'), False, error_message=error_msg) return {'success': False, 'error': error_msg} # 파일 크기 확인 if not self._check_file_size_limit(abs_path, root_name): error_msg = f'파일 크기 제한 초과: {abs_path}' self._log_audit_event(FileOperation.READ, abs_path, os.getenv('USER', 'unknown'), False, error_message=error_msg) return {'success': False, 'error': error_msg} # 파일 읽기 with self.file_lock: with open(abs_path, 'r', encoding=encoding) as f: if head and tail: # head와 tail 둘 다 지정된 경우 에러 error_msg = 'head와 tail은 동시에 사용할 수 없습니다' return {'success': False, 'error': error_msg} elif head: lines = [] for i, line in enumerate(f): if i >= head: break lines.append(line) content = ''.join(lines) elif tail: # 모든 라인을 읽고 마지막 N개만 가져오기 all_lines = f.readlines() content = ''.join(all_lines[-tail:]) if len(all_lines) >= tail else ''.join(all_lines) else: content = f.read() # 메타데이터 수집 metadata = self._get_file_metadata(abs_path) # 감사 로그 기록 self._log_audit_event( operation=FileOperation.READ, path=abs_path, user=os.getenv('USER', 'unknown'), success=True, file_size=len(content), details={'encoding': encoding, 'head': head, 'tail': tail} ) self.logger.info(f"파일 읽기 성공: {len(content)} 문자") return { 'success': True, 'content': content, 'metadata': asdict(metadata) if metadata else None, 'encoding': encoding, 'path': abs_path } except Exception as e: error_msg = f'파일 읽기 실패: {str(e)}' self.logger.error(error_msg) self._log_audit_event(FileOperation.READ, path, os.getenv('USER', 'unknown'), False, error_message=str(e)) return {'success': False, 'error': error_msg} def read_multiple_files(self, paths: List[str], encoding: str = 'utf-8') -> Dict[str, Any]: """여러 파일 읽기 (핵심 기능 2)""" try: self.logger.info(f"여러 파일 읽기 시도: {len(paths)}개 파일") results = {} successful_reads = 0 for path in paths: result = self.filesystem__read_file(path, encoding) results[path] = result if result['success']: successful_reads += 1 return { 'success': True, 'results': results, 'total_files': len(paths), 'successful_reads': successful_reads, 'failed_reads': len(paths) - successful_reads } except Exception as e: error_msg = f'여러 파일 읽기 실패: {str(e)}' self.logger.error(error_msg) return {'success': False, 'error': error_msg} def filesystem__write_file(self, path: str, content: str, encoding: str = 'utf-8', create_dirs: bool = True) -> Dict[str, Any]: """파일 쓰기 (핵심 기능 3) - 원자적 쓰기""" try: self.logger.info(f"파일 쓰기 시도: {path}") # 절대 경로로 변환 abs_path = os.path.abspath(path) file_path = Path(abs_path) # 디렉토리 생성 필요 시 if create_dirs and not file_path.parent.exists(): file_path.parent.mkdir(parents=True, exist_ok=True) # Root 찾기 및 권한 확인 root_name = self._find_root_for_path(abs_path) if not root_name: error_msg = f'허용되지 않은 경로: {abs_path}' self._log_audit_event(FileOperation.WRITE, abs_path, os.getenv('USER', 'unknown'), False, error_message=error_msg) return {'success': False, 'error': error_msg} operation = FileOperation.CREATE if not file_path.exists() else FileOperation.WRITE if not self._check_permission(root_name, operation): error_msg = f'쓰기 권한이 없습니다: {abs_path}' self._log_audit_event(operation, abs_path, os.getenv('USER', 'unknown'), False, error_message=error_msg) return {'success': False, 'error': error_msg} # 내용 크기 확인 content_size = len(content.encode(encoding)) max_size = self.roots[root_name].max_file_size if content_size > max_size: error_msg = f'내용 크기가 제한을 초과합니다: {content_size} > {max_size}' self._log_audit_event(operation, abs_path, os.getenv('USER', 'unknown'), False, error_message=error_msg) return {'success': False, 'error': error_msg} # 원자적 쓰기 (임시 파일 사용) temp_path = file_path.with_suffix(file_path.suffix + '.tmp') with self.file_lock: # 임시 파일에 쓰기 with open(temp_path, 'w', encoding=encoding) as f: f.write(content) f.flush() os.fsync(f.fileno()) # 디스크에 강제 쓰기 # 원자적 이동 shutil.move(temp_path, file_path) # 메타데이터 수집 metadata = self._get_file_metadata(abs_path) # 감사 로그 기록 self._log_audit_event( operation=operation, path=abs_path, user=os.getenv('USER', 'unknown'), success=True, file_size=content_size, details={'encoding': encoding, 'create_dirs': create_dirs} ) self.logger.info(f"파일 쓰기 성공: {content_size} 바이트") return { 'success': True, 'path': abs_path, 'size': content_size, 'metadata': asdict(metadata) if metadata else None, 'encoding': encoding } except Exception as e: # 임시 파일 정리 temp_path = Path(path).with_suffix(Path(path).suffix + '.tmp') if temp_path.exists(): try: temp_path.unlink() except: pass error_msg = f'파일 쓰기 실패: {str(e)}' self.logger.error(error_msg) self._log_audit_event(FileOperation.WRITE, path, os.getenv('USER', 'unknown'), False, error_message=str(e)) return {'success': False, 'error': error_msg} def edit_file(self, path: str, old_text: str, new_text: str, encoding: str = 'utf-8') -> Dict[str, Any]: """파일 편집 (핵심 기능 4) - 텍스트 교체""" try: self.logger.info(f"파일 편집 시도: {path}") # 절대 경로로 변환 abs_path = os.path.abspath(path) # Root 찾기 및 권한 확인 root_name = self._find_root_for_path(abs_path) if not root_name: error_msg = f'허용되지 않은 경로: {abs_path}' self._log_audit_event(FileOperation.WRITE, abs_path, os.getenv('USER', 'unknown'), False, error_message=error_msg) return {'success': False, 'error': error_msg} if not self._check_permission(root_name, FileOperation.WRITE): error_msg = f'편집 권한이 없습니다: {abs_path}' self._log_audit_event(FileOperation.WRITE, abs_path, os.getenv('USER', 'unknown'), False, error_message=error_msg) return {'success': False, 'error': error_msg} # 파일 존재 확인 if not os.path.exists(abs_path): error_msg = f'파일이 존재하지 않습니다: {abs_path}' self._log_audit_event(FileOperation.WRITE, abs_path, os.getenv('USER', 'unknown'), False, error_message=error_msg) return {'success': False, 'error': error_msg} # 파일 읽기 with self.file_lock: with open(abs_path, 'r', encoding=encoding) as f: original_content = f.read() # 텍스트 교체 if old_text not in original_content: error_msg = f'교체할 텍스트를 찾을 수 없습니다: {old_text[:50]}...' self._log_audit_event(FileOperation.WRITE, abs_path, os.getenv('USER', 'unknown'), False, error_message=error_msg) return {'success': False, 'error': error_msg} new_content = original_content.replace(old_text, new_text) changes_count = original_content.count(old_text) # 변경사항이 있는지 확인 if new_content == original_content: return { 'success': True, 'path': abs_path, 'changes_made': False, 'changes_count': 0, 'message': '변경사항이 없습니다' } # 크기 제한 확인 new_content_size = len(new_content.encode(encoding)) max_size = self.roots[root_name].max_file_size if new_content_size > max_size: error_msg = f'편집 후 파일 크기가 제한을 초과합니다: {new_content_size} > {max_size}' self._log_audit_event(FileOperation.WRITE, abs_path, os.getenv('USER', 'unknown'), False, error_message=error_msg) return {'success': False, 'error': error_msg} # 원자적 쓰기 file_path = Path(abs_path) temp_path = file_path.with_suffix(file_path.suffix + '.tmp') with self.file_lock: with open(temp_path, 'w', encoding=encoding) as f: f.write(new_content) f.flush() os.fsync(f.fileno()) shutil.move(temp_path, file_path) # 메타데이터 수집 metadata = self._get_file_metadata(abs_path) # 감사 로그 기록 self._log_audit_event( operation=FileOperation.WRITE, path=abs_path, user=os.getenv('USER', 'unknown'), success=True, file_size=new_content_size, details={ 'edit_type': 'text_replace', 'changes_count': changes_count, 'old_size': len(original_content), 'new_size': len(new_content) } ) self.logger.info(f"파일 편집 성공: {changes_count}곳 변경") return { 'success': True, 'path': abs_path, 'changes_made': True, 'changes_count': changes_count, 'original_size': len(original_content), 'new_size': len(new_content), 'metadata': asdict(metadata) if metadata else None } except Exception as e: # 임시 파일 정리 temp_path = Path(path).with_suffix(Path(path).suffix + '.tmp') if temp_path.exists(): try: temp_path.unlink() except: pass error_msg = f'파일 편집 실패: {str(e)}' self.logger.error(error_msg) self._log_audit_event(FileOperation.WRITE, path, os.getenv('USER', 'unknown'), False, error_message=str(e)) return {'success': False, 'error': error_msg} def create_directory(self, path: str, parents: bool = True, exist_ok: bool = True) -> Dict[str, Any]: """디렉토리 생성 (핵심 기능 5)""" try: self.logger.info(f"디렉토리 생성 시도: {path}") # 절대 경로로 변환 abs_path = os.path.abspath(path) dir_path = Path(abs_path) # Root 찾기 및 권한 확인 root_name = self._find_root_for_path(abs_path) if not root_name: error_msg = f'허용되지 않은 경로: {abs_path}' self._log_audit_event(FileOperation.CREATE, abs_path, os.getenv('USER', 'unknown'), False, error_message=error_msg) return {'success': False, 'error': error_msg} if not self._check_permission(root_name, FileOperation.CREATE): error_msg = f'생성 권한이 없습니다: {abs_path}' self._log_audit_event(FileOperation.CREATE, abs_path, os.getenv('USER', 'unknown'), False, error_message=error_msg) return {'success': False, 'error': error_msg} # 이미 존재하는지 확인 if dir_path.exists(): if dir_path.is_dir(): if exist_ok: self.logger.info(f"디렉토리가 이미 존재함: {abs_path}") metadata = self._get_file_metadata(abs_path) return { 'success': True, 'path': abs_path, 'created': False, 'message': '디렉토리가 이미 존재합니다', 'metadata': asdict(metadata) if metadata else None } else: error_msg = f'디렉토리가 이미 존재합니다: {abs_path}' self._log_audit_event(FileOperation.CREATE, abs_path, os.getenv('USER', 'unknown'), False, error_message=error_msg) return {'success': False, 'error': error_msg} else: error_msg = f'파일이 이미 존재합니다 (디렉토리가 아님): {abs_path}' self._log_audit_event(FileOperation.CREATE, abs_path, os.getenv('USER', 'unknown'), False, error_message=error_msg) return {'success': False, 'error': error_msg} # 디렉토리 생성 with self.file_lock: try: dir_path.mkdir(parents=parents, exist_ok=exist_ok) created = True except FileExistsError: if exist_ok: created = False else: raise except FileNotFoundError: if not parents: error_msg = f'상위 디렉토리가 존재하지 않습니다: {abs_path}' self._log_audit_event(FileOperation.CREATE, abs_path, os.getenv('USER', 'unknown'), False, error_message=error_msg) return {'success': False, 'error': error_msg} else: raise # 메타데이터 수집 metadata = self._get_file_metadata(abs_path) # 감사 로그 기록 self._log_audit_event( operation=FileOperation.CREATE, path=abs_path, user=os.getenv('USER', 'unknown'), success=True, details={ 'type': 'directory', 'parents': parents, 'exist_ok': exist_ok, 'created': created } ) self.logger.info(f"디렉토리 생성 성공: {abs_path}") return { 'success': True, 'path': abs_path, 'created': created, 'metadata': asdict(metadata) if metadata else None } except PermissionError: error_msg = f'디렉토리 생성 권한이 없습니다: {path}' self.logger.error(error_msg) self._log_audit_event(FileOperation.CREATE, path, os.getenv('USER', 'unknown'), False, error_message=error_msg) return {'success': False, 'error': error_msg} except Exception as e: error_msg = f'디렉토리 생성 실패: {str(e)}' self.logger.error(error_msg) self._log_audit_event(FileOperation.CREATE, path, os.getenv('USER', 'unknown'), False, error_message=str(e)) return {'success': False, 'error': error_msg} def filesystem__list_directory(self, path: str, show_hidden: bool = False) -> Dict[str, Any]: """디렉토리 목록 조회 (핵심 기능 6)""" try: self.logger.info(f"디렉토리 목록 조회: {path}") # 절대 경로로 변환 abs_path = os.path.abspath(path) # Root 찾기 및 권한 확인 root_name = self._find_root_for_path(abs_path) if not root_name: error_msg = f'허용되지 않은 경로: {abs_path}' self._log_audit_event(FileOperation.READ, abs_path, os.getenv('USER', 'unknown'), False, error_message=error_msg) return {'success': False, 'error': error_msg} if not self._check_permission(root_name, FileOperation.READ): error_msg = f'읽기 권한이 없습니다: {abs_path}' self._log_audit_event(FileOperation.READ, abs_path, os.getenv('USER', 'unknown'), False, error_message=error_msg) return {'success': False, 'error': error_msg} # 디렉토리 존재 및 타입 확인 if not os.path.exists(abs_path): error_msg = f'디렉토리가 존재하지 않습니다: {abs_path}' self._log_audit_event(FileOperation.READ, abs_path, os.getenv('USER', 'unknown'), False, error_message=error_msg) return {'success': False, 'error': error_msg} if not os.path.isdir(abs_path): error_msg = f'디렉토리가 아닙니다: {abs_path}' self._log_audit_event(FileOperation.READ, abs_path, os.getenv('USER', 'unknown'), False, error_message=error_msg) return {'success': False, 'error': error_msg} # 디렉토리 내용 조회 items = [] with self.file_lock: try: for item_name in os.listdir(abs_path): # 숨김 파일 처리 if not show_hidden and item_name.startswith('.'): continue item_path = os.path.join(abs_path, item_name) try: # 기본 정보 item_info = { 'name': item_name, 'path': item_path, 'type': '[DIR]' if os.path.isdir(item_path) else '[FILE]' } # 메타데이터 수집 (선택적) metadata = self._get_file_metadata(item_path) if metadata: item_info.update({ 'size': metadata.size, 'modified': metadata.modified_time.isoformat(), 'permissions': metadata.permissions, 'is_symlink': metadata.is_symlink }) if metadata.is_symlink and metadata.symlink_target: item_info['symlink_target'] = metadata.symlink_target items.append(item_info) except (PermissionError, OSError) as e: # 개별 파일 접근 실패는 경고만 로그 self.logger.warning(f"항목 정보 수집 실패 {item_path}: {e}") items.append({ 'name': item_name, 'path': item_path, 'type': '[UNKNOWN]', 'error': str(e) }) continue except PermissionError: error_msg = f'디렉토리 읽기 권한이 없습니다: {abs_path}' self._log_audit_event(FileOperation.READ, abs_path, os.getenv('USER', 'unknown'), False, error_message=error_msg) return {'success': False, 'error': error_msg} # 정렬 (디렉토리 먼저, 그 다음 파일명순) items.sort(key=lambda x: (x['type'] != '[DIR]', x['name'].lower())) # 감사 로그 기록 self._log_audit_event( operation=FileOperation.READ, path=abs_path, user=os.getenv('USER', 'unknown'), success=True, details={ 'type': 'directory_listing', 'item_count': len(items), 'show_hidden': show_hidden } ) self.logger.info(f"디렉토리 목록 조회 성공: {len(items)}개 항목") return { 'success': True, 'path': abs_path, 'items': items, 'count': len(items), 'show_hidden': show_hidden } except Exception as e: error_msg = f'디렉토리 목록 조회 실패: {str(e)}' self.logger.error(error_msg) self._log_audit_event(FileOperation.READ, path, os.getenv('USER', 'unknown'), False, error_message=str(e)) return {'success': False, 'error': error_msg} def list_directory_with_sizes(self, path: str, sort_by: str = 'name', show_hidden: bool = False) -> Dict[str, Any]: """크기를 포함한 디렉토리 목록 조회 (핵심 기능 7)""" try: self.logger.info(f"크기별 디렉토리 목록 조회: {path}") # 기본 디렉토리 목록 조회 base_result = self.filesystem__list_directory(path, show_hidden) if not base_result['success']: return base_result items = base_result['items'] # 크기 정보 추가 및 강화 enhanced_items = [] total_size = 0 for item in items: enhanced_item = item.copy() try: item_path = item['path'] if item['type'] == '[DIR]': # 디렉토리의 경우 하위 항목 개수 계산 try: subitem_count = len([f for f in os.listdir(item_path) if show_hidden or not f.startswith('.')]) enhanced_item['subitem_count'] = subitem_count enhanced_item['size_display'] = f"{subitem_count} items" except (PermissionError, OSError): enhanced_item['subitem_count'] = 0 enhanced_item['size_display'] = "No access" else: # 파일의 경우 크기를 사람이 읽기 쉬운 형태로 변환 file_size = item.get('size', 0) enhanced_item['size_display'] = self._format_file_size(file_size) total_size += file_size except Exception as e: self.logger.warning(f"항목 크기 정보 수집 실패 {item['path']}: {e}") enhanced_item['size_display'] = "Unknown" enhanced_items.append(enhanced_item) # 정렬 if sort_by == 'size': # 크기순 정렬 (디렉토리는 하위 항목 수, 파일은 크기) enhanced_items.sort(key=lambda x: ( x['type'] != '[FILE]', # 파일 우선 -(x.get('size', 0) if x['type'] == '[FILE]' else x.get('subitem_count', 0)) )) elif sort_by == 'name': # 이름순 정렬 (기본) enhanced_items.sort(key=lambda x: (x['type'] != '[DIR]', x['name'].lower())) elif sort_by == 'modified': # 수정일순 정렬 enhanced_items.sort(key=lambda x: x.get('modified', ''), reverse=True) # 감사 로그 기록 self._log_audit_event( operation=FileOperation.READ, path=path, user=os.getenv('USER', 'unknown'), success=True, details={ 'type': 'directory_listing_with_sizes', 'sort_by': sort_by, 'total_size': total_size, 'item_count': len(enhanced_items) } ) self.logger.info(f"크기별 디렉토리 목록 조회 성공: {len(enhanced_items)}개 항목") return { 'success': True, 'path': path, 'items': enhanced_items, 'count': len(enhanced_items), 'total_size': total_size, 'total_size_display': self._format_file_size(total_size), 'sort_by': sort_by, 'show_hidden': show_hidden } except Exception as e: error_msg = f'크기별 디렉토리 목록 조회 실패: {str(e)}' self.logger.error(error_msg) return {'success': False, 'error': error_msg} def _format_file_size(self, size_bytes: int) -> str: """파일 크기를 사람이 읽기 쉬운 형태로 변환""" if size_bytes == 0: return "0 B" units = ['B', 'KB', 'MB', 'GB', 'TB'] unit_index = 0 size = float(size_bytes) while size >= 1024 and unit_index < len(units) - 1: size /= 1024 unit_index += 1 if unit_index == 0: return f"{int(size)} {units[unit_index]}" else: return f"{size:.1f} {units[unit_index]}" def directory_tree(self, path: str, max_depth: int = 10, include_hidden: bool = False) -> Dict[str, Any]: """디렉토리 트리 구조 조회 (핵심 기능 8)""" try: self.logger.info(f"디렉토리 트리 조회 시작: {path}") # Roots 프로토콜 권한 확인 root_name = self._find_root_for_path(path) if not root_name: error_msg = f'허용되지 않은 경로: {path}' self.logger.warning(error_msg) self._log_audit_event( operation=FileOperation.READ, path=path, user=os.getenv('USER', 'unknown'), success=False, error_message=error_msg ) return {'success': False, 'error': error_msg} # 읽기 권한 확인 if not self._check_permission(root_name, FileOperation.READ): error_msg = f'읽기 권한이 없습니다: {path}' self.logger.warning(error_msg) self._log_audit_event( operation=FileOperation.READ, path=path, user=os.getenv('USER', 'unknown'), success=False, error_message=error_msg ) return {'success': False, 'error': error_msg} # 경로 존재 확인 if not os.path.exists(path): error_msg = f'경로가 존재하지 않습니다: {path}' self.logger.warning(error_msg) return {'success': False, 'error': error_msg} # 디렉토리 확인 if not os.path.isdir(path): error_msg = f'디렉토리가 아닙니다: {path}' self.logger.warning(error_msg) return {'success': False, 'error': error_msg} # 재귀적으로 트리 구조 생성 tree_data = self._build_directory_tree(path, max_depth, include_hidden, current_depth=0) # 감사 로그 기록 self._log_audit_event( operation=FileOperation.READ, path=path, user=os.getenv('USER', 'unknown'), success=True ) self.logger.info(f"디렉토리 트리 조회 완료: {path}") return { 'success': True, 'path': path, 'tree': tree_data, 'max_depth': max_depth, 'include_hidden': include_hidden } except Exception as e: error_msg = f'디렉토리 트리 조회 실패: {str(e)}' self.logger.error(error_msg) self._log_audit_event( operation=FileOperation.READ, path=path, user=os.getenv('USER', 'unknown'), success=False, error_message=str(e) ) return {'success': False, 'error': error_msg} def _build_directory_tree(self, path: str, max_depth: int, include_hidden: bool, current_depth: int = 0) -> Dict[str, Any]: """재귀적으로 디렉토리 트리 구조 생성""" try: tree_node = { 'name': os.path.basename(path), 'type': 'directory', 'path': path, 'children': [] } # 최대 깊이 확인 if current_depth >= max_depth: tree_node['truncated'] = True return tree_node # 디렉토리 내용 읽기 try: entries = os.listdir(path) except PermissionError: tree_node['error'] = 'Permission denied' return tree_node # 정렬 entries.sort() for entry in entries: # 숨김 파일 처리 if not include_hidden and entry.startswith('.'): continue entry_path = os.path.join(path, entry) try: if os.path.isdir(entry_path): # 하위 디렉토리 재귀 호출 child_tree = self._build_directory_tree( entry_path, max_depth, include_hidden, current_depth + 1 ) tree_node['children'].append(child_tree) else: # 파일 노드 생성 file_node = { 'name': entry, 'type': 'file', 'path': entry_path } # 파일 크기 추가 (선택적) try: stat_info = os.stat(entry_path) file_node['size'] = stat_info.st_size file_node['size_display'] = self._format_file_size(stat_info.st_size) file_node['modified'] = datetime.fromtimestamp(stat_info.st_mtime).isoformat() except (OSError, IOError): pass tree_node['children'].append(file_node) except (OSError, IOError) as e: # 접근할 수 없는 항목은 오류 노드로 추가 error_node = { 'name': entry, 'type': 'error', 'path': entry_path, 'error': str(e) } tree_node['children'].append(error_node) # 통계 정보 추가 total_files = sum(1 for child in tree_node['children'] if child['type'] == 'file') total_dirs = sum(1 for child in tree_node['children'] if child['type'] == 'directory') tree_node['stats'] = { 'total_files': total_files, 'total_directories': total_dirs, 'total_items': len(tree_node['children']) } return tree_node except Exception as e: self.logger.error(f"트리 구조 생성 실패 {path}: {e}") return { 'name': os.path.basename(path), 'type': 'error', 'path': path, 'error': str(e) } def move_file(self, source: str, destination: str, overwrite: bool = False) -> Dict[str, Any]: """파일 이동/이름 변경 (핵심 기능 9)""" try: self.logger.info(f"파일 이동 시도: {source} -> {destination}") # 소스 경로 Roots 프로토콜 권한 확인 source_root = self._find_root_for_path(source) if not source_root: error_msg = f'소스 경로가 허용되지 않음: {source}' self.logger.warning(error_msg) self._log_audit_event( operation=FileOperation.DELETE, # 이동은 삭제+생성으로 간주 path=source, user=os.getenv('USER', 'unknown'), success=False, error_message=error_msg ) return {'success': False, 'error': error_msg} # 대상 경로 Roots 프로토콜 권한 확인 dest_root = self._find_root_for_path(destination) if not dest_root: error_msg = f'대상 경로가 허용되지 않음: {destination}' self.logger.warning(error_msg) return {'success': False, 'error': error_msg} # 소스 경로 삭제 권한 확인 if not self._check_permission(source_root, FileOperation.DELETE): error_msg = f'소스 경로 삭제 권한이 없습니다: {source}' self.logger.warning(error_msg) return {'success': False, 'error': error_msg} # 대상 경로 생성 권한 확인 if not self._check_permission(dest_root, FileOperation.CREATE): error_msg = f'대상 경로 생성 권한이 없습니다: {destination}' self.logger.warning(error_msg) return {'success': False, 'error': error_msg} # 소스 파일/디렉토리 존재 확인 if not os.path.exists(source): error_msg = f'소스가 존재하지 않습니다: {source}' self.logger.warning(error_msg) return {'success': False, 'error': error_msg} # 대상 경로 처리 destination = os.path.abspath(destination) dest_dir = os.path.dirname(destination) # 대상 디렉토리가 존재하지 않으면 생성 if not os.path.exists(dest_dir): try: os.makedirs(dest_dir, exist_ok=True) self.logger.info(f"대상 디렉토리 생성: {dest_dir}") except Exception as e: error_msg = f'대상 디렉토리 생성 실패: {str(e)}' self.logger.error(error_msg) return {'success': False, 'error': error_msg} # 대상 파일이 이미 존재하는 경우 처리 if os.path.exists(destination): if not overwrite: error_msg = f'대상 파일이 이미 존재합니다 (overwrite=False): {destination}' self.logger.warning(error_msg) return {'success': False, 'error': error_msg} else: self.logger.info(f"기존 대상 파일 덮어쓰기: {destination}") # 소스와 대상이 같은 경우 확인 if os.path.abspath(source) == destination: error_msg = f'소스와 대상이 동일합니다: {source}' self.logger.warning(error_msg) return {'success': False, 'error': error_msg} # 메타데이터 수집 (이동 전) source_metadata = self._get_file_metadata(source) is_directory = os.path.isdir(source) # 원자적 이동 수행 import shutil try: if is_directory: # 디렉토리 이동 if os.path.exists(destination) and overwrite: shutil.rmtree(destination) shutil.move(source, destination) operation_type = "directory_move" else: # 파일 이동 shutil.move(source, destination) operation_type = "file_move" # 대상 메타데이터 수집 (이동 후) dest_metadata = self._get_file_metadata(destination) # 감사 로그 기록 (성공) self._log_audit_event( operation=FileOperation.DELETE, # 소스에서 삭제 path=source, user=os.getenv('USER', 'unknown'), success=True ) self._log_audit_event( operation=FileOperation.CREATE, # 대상에서 생성 path=destination, user=os.getenv('USER', 'unknown'), success=True ) self.logger.info(f"파일 이동 성공: {source} -> {destination}") return { 'success': True, 'source': source, 'destination': destination, 'operation_type': operation_type, 'overwrite_occurred': os.path.exists(destination) and overwrite, 'source_metadata': asdict(source_metadata) if source_metadata else None, 'destination_metadata': asdict(dest_metadata) if dest_metadata else None } except Exception as e: error_msg = f'파일 이동 실패: {str(e)}' self.logger.error(error_msg) # 감사 로그 기록 (실패) self._log_audit_event( operation=FileOperation.DELETE, path=source, user=os.getenv('USER', 'unknown'), success=False, error_message=str(e) ) return {'success': False, 'error': error_msg} except Exception as e: error_msg = f'파일 이동 처리 실패: {str(e)}' self.logger.error(error_msg) return {'success': False, 'error': error_msg} def search_files(self, path: str, pattern: str, excludePatterns: Optional[List[str]] = None) -> Dict[str, Any]: """파일 검색 (핵심 기능 10)""" try: self.logger.info(f"파일 검색 시도: {path}, 패턴: {pattern}") # Roots 프로토콜 권한 확인 root_info = self._find_root_for_path(path) if not root_info: error_msg = f'경로가 허용되지 않음: {path}' self.logger.warning(error_msg) return {'success': False, 'error': error_msg} # 읽기 권한 확인 if not self._check_permission(root_info, FileOperation.READ): error_msg = f'읽기 권한이 없습니다: {path}' self.logger.warning(error_msg) return {'success': False, 'error': error_msg} # 경로 존재 확인 if not os.path.exists(path): error_msg = f'경로가 존재하지 않습니다: {path}' self.logger.warning(error_msg) return {'success': False, 'error': error_msg} # 디렉토리 확인 if not os.path.isdir(path): error_msg = f'검색 경로는 디렉토리여야 합니다: {path}' self.logger.warning(error_msg) return {'success': False, 'error': error_msg} # 제외 패턴 기본값 설정 if excludePatterns is None: excludePatterns = ['.*', '__pycache__', '*.pyc', '*.pyo', 'node_modules', '.git'] found_files = [] found_directories = [] # 재귀적 검색 for root, dirs, files in os.walk(path): try: # 제외 패턴에 따라 디렉토리 필터링 dirs[:] = [d for d in dirs if not any(fnmatch.fnmatch(d, pattern) for pattern in excludePatterns)] # 파일 검색 for file in files: # 제외 패턴 확인 if any(fnmatch.fnmatch(file, exc_pattern) for exc_pattern in excludePatterns): continue # 패턴 매치 확인 (대소문자 무시) if fnmatch.fnmatch(file.lower(), pattern.lower()): file_path = os.path.join(root, file) relative_path = os.path.relpath(file_path, path) # 파일 메타데이터 수집 try: file_stat = os.stat(file_path) found_files.append({ 'path': file_path, 'relative_path': relative_path, 'name': file, 'size': file_stat.st_size, 'modified': datetime.fromtimestamp(file_stat.st_mtime).isoformat(), 'type': 'file' }) except OSError: # 권한 부족이나 삭제된 파일 등의 경우 continue # 디렉토리 검색 for dir_name in dirs: if fnmatch.fnmatch(dir_name.lower(), pattern.lower()): dir_path = os.path.join(root, dir_name) relative_path = os.path.relpath(dir_path, path) # 디렉토리 메타데이터 수집 try: dir_stat = os.stat(dir_path) found_directories.append({ 'path': dir_path, 'relative_path': relative_path, 'name': dir_name, 'modified': datetime.fromtimestamp(dir_stat.st_mtime).isoformat(), 'type': 'directory' }) except OSError: continue except PermissionError: # 권한이 없는 디렉토리는 건너뛰기 continue # 결과 정렬 (이름순) found_files.sort(key=lambda x: x['name'].lower()) found_directories.sort(key=lambda x: x['name'].lower()) self.logger.info(f"파일 검색 완료: {len(found_files)}개 파일, {len(found_directories)}개 디렉토리") return { 'success': True, 'search_path': path, 'pattern': pattern, 'exclude_patterns': excludePatterns, 'files': found_files, 'directories': found_directories, 'total_files': len(found_files), 'total_directories': len(found_directories) } except Exception as e: error_msg = f'파일 검색 실패: {str(e)}' self.logger.error(error_msg) return {'success': False, 'error': error_msg} def get_file_info(self, path: str) -> Dict[str, Any]: """파일 정보 조회 (핵심 기능 11)""" try: self.logger.info(f"파일 정보 조회: {path}") # Roots 프로토콜 권한 확인 root_info = self._find_root_for_path(path) if not root_info: error_msg = f'경로가 허용되지 않음: {path}' self.logger.warning(error_msg) return {'success': False, 'error': error_msg} # 읽기 권한 확인 if not self._check_permission(root_info, FileOperation.READ): error_msg = f'읽기 권한이 없습니다: {path}' self.logger.warning(error_msg) return {'success': False, 'error': error_msg} # 경로 존재 확인 if not os.path.exists(path): error_msg = f'경로가 존재하지 않습니다: {path}' self.logger.warning(error_msg) return {'success': False, 'error': error_msg} # 파일 통계 정보 수집 stat_info = os.stat(path) # 기본 정보 file_info = { 'success': True, 'path': path, 'name': os.path.basename(path), 'absolute_path': os.path.abspath(path), 'exists': True, 'type': 'directory' if os.path.isdir(path) else 'file', 'size': stat_info.st_size, 'created': datetime.fromtimestamp(stat_info.st_ctime).isoformat(), 'modified': datetime.fromtimestamp(stat_info.st_mtime).isoformat(), 'accessed': datetime.fromtimestamp(stat_info.st_atime).isoformat(), 'permissions': { 'readable': os.access(path, os.R_OK), 'writable': os.access(path, os.W_OK), 'executable': os.access(path, os.X_OK), 'mode': oct(stat_info.st_mode)[-3:] # rwx permissions }, 'owner': { 'uid': stat_info.st_uid, 'gid': stat_info.st_gid } } # 파일별 추가 정보 if os.path.isfile(path): # MIME 타입 추정 mime_type, encoding = mimetypes.guess_type(path) file_info['mime_type'] = mime_type file_info['encoding'] = encoding # 파일 확장자 file_info['extension'] = os.path.splitext(path)[1].lower() # 해시 계산 (작은 파일만) if stat_info.st_size < 1024 * 1024: # 1MB 미만 try: with open(path, 'rb') as f: content = f.read() file_info['checksums'] = { 'md5': hashlib.md5(content).hexdigest(), 'sha256': hashlib.sha256(content).hexdigest() } except Exception: file_info['checksums'] = None else: file_info['checksums'] = None # 텍스트 파일 여부 확인 try: with open(path, 'r', encoding='utf-8') as f: f.read(100) # 첫 100문자만 읽어서 텍스트 파일인지 확인 file_info['is_text'] = True except (UnicodeDecodeError, PermissionError): file_info['is_text'] = False elif os.path.isdir(path): # 디렉토리 내용 개수 try: contents = os.listdir(path) files = [f for f in contents if os.path.isfile(os.path.join(path, f))] dirs = [d for d in contents if os.path.isdir(os.path.join(path, d))] file_info['directory_info'] = { 'total_items': len(contents), 'file_count': len(files), 'directory_count': len(dirs), 'is_empty': len(contents) == 0 } except PermissionError: file_info['directory_info'] = {'error': 'Permission denied'} # 심볼릭 링크 정보 if os.path.islink(path): file_info['is_symlink'] = True file_info['symlink_target'] = os.readlink(path) file_info['symlink_target_exists'] = os.path.exists(os.readlink(path)) else: file_info['is_symlink'] = False self.logger.info(f"파일 정보 조회 완료: {path}") return file_info except Exception as e: error_msg = f'파일 정보 조회 실패: {str(e)}' self.logger.error(error_msg) return {'success': False, 'error': error_msg, 'path': path} def list_allowed_directories(self) -> Dict[str, Any]: """허용된 디렉토리 목록 조회 (핵심 기능 12)""" try: self.logger.info("허용된 디렉토리 목록 조회") # Roots 정보를 기반으로 허용된 디렉토리 수집 allowed_dirs = [] for root_name, root_config in self.roots.items(): root_path = root_config.path permission_level = root_config.permission_level # 경로 존재 확인 exists = os.path.exists(root_path) accessible = False if exists: try: # 접근 가능성 확인 accessible = os.access(root_path, os.R_OK) # 디렉토리 정보 수집 stat_info = os.stat(root_path) dir_info = { 'root_name': root_name, 'path': root_path, 'name': os.path.basename(root_path) or root_path, 'absolute_path': os.path.abspath(root_path), 'permission_level': permission_level.value, 'exists': True, 'accessible': accessible, 'is_directory': os.path.isdir(root_path), 'created': datetime.fromtimestamp(stat_info.st_ctime).isoformat(), 'modified': datetime.fromtimestamp(stat_info.st_mtime).isoformat(), 'permissions': { 'readable': os.access(root_path, os.R_OK), 'writable': os.access(root_path, os.W_OK), 'executable': os.access(root_path, os.X_OK) } } # 디렉토리인 경우 내용 개수 확인 if os.path.isdir(root_path): try: contents = os.listdir(root_path) dir_info['item_count'] = len(contents) except PermissionError: dir_info['item_count'] = None else: dir_info['item_count'] = None except OSError as e: dir_info = { 'root_name': root_name, 'path': root_path, 'name': os.path.basename(root_path) or root_path, 'permission_level': permission_level.value, 'exists': True, 'accessible': False, 'error': f'통계 정보 수집 실패: {str(e)}' } else: dir_info = { 'root_name': root_name, 'path': root_path, 'name': os.path.basename(root_path) or root_path, 'permission_level': permission_level.value, 'exists': False, 'accessible': False, 'error': '경로가 존재하지 않음' } allowed_dirs.append(dir_info) # 권한 레벨별 통계 permission_stats = {} for permission_level in PermissionLevel: count = sum(1 for d in allowed_dirs if d.get('permission_level') == permission_level.value) permission_stats[permission_level.value] = count # 접근 가능한 디렉토리 통계 accessible_count = sum(1 for d in allowed_dirs if d.get('accessible', False)) existing_count = sum(1 for d in allowed_dirs if d.get('exists', False)) self.logger.info(f"허용된 디렉토리 목록 조회 완료: 총 {len(allowed_dirs)}개") return { 'success': True, 'total_directories': len(allowed_dirs), 'accessible_directories': accessible_count, 'existing_directories': existing_count, 'permission_statistics': permission_stats, 'directories': allowed_dirs, 'timestamp': datetime.now().isoformat() } except Exception as e: error_msg = f'허용된 디렉토리 목록 조회 실패: {str(e)}' self.logger.error(error_msg) return {'success': False, 'error': error_msg} def cleanup(self): """리소스 정리""" try: if os.path.exists(self.db_path): os.remove(self.db_path) self.logger.info("CompleteFilesystemMCP 정리 완료") except Exception as e: self.logger.error(f"리소스 정리 실패: {e}") def test_complete_filesystem_mcp(): """테스트 함수""" print("=== Complete Filesystem MCP 테스트 시작 ===") # MCP 인스턴스 생성 fs_mcp = CompleteFilesystemMCP() try: # 테스트 파일 경로 test_file = "/home/skyki/qwen2.5/test_file.txt" test_dir = "/home/skyki/qwen2.5/test_directory" # 1. 파일 쓰기 테스트 print("\n=== 파일 쓰기 테스트 ===") write_result = fs_mcp.filesystem__write_file(test_file, "Hello, Complete Filesystem MCP!") print(f"쓰기 성공: {write_result['success']}") if write_result['success']: print(f"파일 크기: {write_result['size']} 바이트") # 2. 파일 읽기 테스트 print("\n=== 파일 읽기 테스트 ===") read_result = fs_mcp.filesystem__read_file(test_file) print(f"읽기 성공: {read_result['success']}") if read_result['success']: print(f"내용: {read_result['content'][:50]}...") # 3. 파일 편집 테스트 print("\n=== 파일 편집 테스트 ===") edit_result = fs_mcp.edit_file(test_file, "Hello", "Hi") print(f"편집 성공: {edit_result['success']}") if edit_result['success']: print(f"변경 횟수: {edit_result['changes_count']}") # 4. 디렉토리 생성 테스트 print("\n=== 디렉토리 생성 테스트 ===") dir_result = fs_mcp.create_directory(test_dir) print(f"디렉토리 생성 성공: {dir_result['success']}") # 5. 여러 파일 읽기 테스트 print("\n=== 여러 파일 읽기 테스트 ===") multi_result = fs_mcp.read_multiple_files([test_file, "/home/skyki/qwen2.5/project_plan.md"]) print(f"여러 파일 읽기 성공: {multi_result['successful_reads']}/{multi_result['total_files']}") # 6. 디렉토리 목록 조회 (기본) print("\n=== 디렉토리 목록 조회 테스트 ===") list_result = fs_mcp.filesystem__list_directory("/home/skyki/qwen2.5") print(f"목록 조회 성공: {list_result['success']}") if list_result['success']: print(f"항목 수: {list_result['count']}") print(f"첫 5개 항목: {[item['name'] for item in list_result['items'][:5]]}") # 7. 크기별 디렉토리 목록 조회 print("\n=== 크기별 디렉토리 목록 조회 테스트 ===") size_list_result = fs_mcp.list_directory_with_sizes("/home/skyki/qwen2.5", sort_by="size") print(f"크기별 목록 조회 성공: {size_list_result['success']}") if size_list_result['success']: print(f"총 항목 수: {size_list_result['count']}") print(f"총 크기: {size_list_result['total_size_display']}") print("크기 기준 상위 3개 항목:") for item in size_list_result['items'][:3]: print(f" - {item['name']}: {item.get('size_display', 'N/A')}") # 8. 디렉토리 트리 구조 조회 print("\n=== 디렉토리 트리 구조 조회 테스트 ===") tree_result = fs_mcp.directory_tree("/home/skyki/qwen2.5", max_depth=2, include_hidden=False) print(f"트리 조회 성공: {tree_result['success']}") if tree_result['success']: tree_data = tree_result['tree'] print(f"루트 디렉토리: {tree_data['name']}") print(f"직접 하위 항목 수: {tree_data['stats']['total_items']}") print(f"파일 수: {tree_data['stats']['total_files']}") print(f"디렉토리 수: {tree_data['stats']['total_directories']}") # 첫 번째 레벨의 몇 개 항목 표시 print("첫 번째 레벨 항목 예시:") for child in tree_data['children'][:3]: print(f" - {child['name']} ({child['type']})") # 9. 파일 이동/이름 변경 테스트 print("\n=== 파일 이동/이름 변경 테스트 ===") move_source = "/home/skyki/qwen2.5/test_file.txt" move_dest = "/home/skyki/qwen2.5/test_directory/moved_file.txt" move_result = fs_mcp.move_file(move_source, move_dest, overwrite=True) print(f"파일 이동 성공: {move_result['success']}") if move_result['success']: print(f"이동: {move_result['source']} -> {move_result['destination']}") print(f"작업 유형: {move_result['operation_type']}") print(f"덮어쓰기 발생: {move_result['overwrite_occurred']}") # 이동된 파일 확인 if os.path.exists(move_dest): print(f"이동된 파일 존재 확인: ✅") if not os.path.exists(move_source): print(f"원본 파일 삭제 확인: ✅") # Root 정보 확인 print("\n=== Root 정보 ===") print(f"등록된 Root 수: {len(fs_mcp.roots)}") for name, config in fs_mcp.roots.items(): print(f" - {name}: {config.path} ({config.permission_level.value})") # 정리 if os.path.exists(test_file): os.remove(test_file) # 이동된 파일 정리 moved_file = "/home/skyki/qwen2.5/test_directory/moved_file.txt" if os.path.exists(moved_file): os.remove(moved_file) if os.path.exists(test_dir): try: os.rmdir(test_dir) except OSError: # 디렉토리가 비어있지 않은 경우 import shutil shutil.rmtree(test_dir) finally: fs_mcp.cleanup() print("\n=== 테스트 완료 ===") if __name__ == "__main__": test_complete_filesystem_mcp()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Skynotdie/mky'

If you have feedback or need assistance with the MCP directory API, please join our Discord server