#!/usr/bin/env python3
"""
Complete Filesystem MCP - 완전한 파일시스템 MCP 도구
mcp_principles/02-1_filesystem_mcp.md 기반 완전 구현
- 12개 핵심 기능 모두 구현
- Roots 프로토콜 완전 지원
- 원자적 쓰기 및 메타데이터 관리
- CLAUDE.md 지침 완전 반영
"""
import os
import sys
import sqlite3
import json
import hashlib
import mimetypes
import tempfile
import shutil
import threading
import time
import logging
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional, Any, Tuple, Union
from dataclasses import dataclass, asdict
from enum import Enum
import glob
import fnmatch
# 로컬 모델 임포트
from .models import (
PermissionLevel,
FileOperation,
RootConfig,
FileMetadata,
AuditEvent
)
# 로깅 설정
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class CompleteFilesystemMCP:
"""완전한 파일시스템 MCP 구현"""
def __init__(self, db_path: Optional[str] = None):
"""초기화"""
# 로거 먼저 설정
self.logger = logging.getLogger("CompleteFilesystemMCP")
# DB 경로 설정
self.db_path = db_path or tempfile.mktemp(suffix='.db')
# Roots 설정
self.roots: Dict[str, RootConfig] = {}
# 감사 로그 저장소
self.audit_log: List[AuditEvent] = []
# 스레드 안전성을 위한 락
self.file_lock = threading.RLock()
# GPU 환경변수 설정 (CLAUDE.md 지침)
self._setup_gpu_environment()
# 데이터베이스 초기화
self._init_database()
# 기본 루트 추가
self._setup_default_roots()
self.logger.info(f"CompleteFilesystemMCP 초기화 완료: DB={self.db_path}")
def _setup_gpu_environment(self):
"""GPU 환경변수 설정 (CLAUDE.md 지침 반영)"""
try:
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2' # 경고 숨김
os.environ['TF_FORCE_GPU_ALLOW_GROWTH'] = 'true' # 메모리 증분 할당
os.environ['CUDA_VISIBLE_DEVICES'] = '0' # GPU 0 사용
os.environ['TF_GPU_ALLOCATOR'] = 'cuda_malloc_async'
self.logger.debug("GPU 환경변수 설정 완료")
except Exception as e:
self.logger.warning(f"GPU 환경변수 설정 실패: {e}")
def _init_database(self):
"""데이터베이스 초기화"""
try:
with self.file_lock:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Roots 테이블
cursor.execute('''
CREATE TABLE IF NOT EXISTS roots (
name TEXT PRIMARY KEY,
path TEXT NOT NULL,
permission_level TEXT NOT NULL,
allowed_operations TEXT NOT NULL,
config TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# 파일 메타데이터 테이블
cursor.execute('''
CREATE TABLE IF NOT EXISTS file_metadata (
path TEXT PRIMARY KEY,
name TEXT NOT NULL,
size INTEGER NOT NULL,
modified_time TIMESTAMP NOT NULL,
created_time TIMESTAMP NOT NULL,
mime_type TEXT NOT NULL,
permissions TEXT NOT NULL,
file_hash TEXT NOT NULL,
is_directory BOOLEAN NOT NULL,
is_symlink BOOLEAN NOT NULL,
symlink_target TEXT,
last_accessed TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# 감사 로그 테이블
cursor.execute('''
CREATE TABLE IF NOT EXISTS audit_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TIMESTAMP NOT NULL,
operation TEXT NOT NULL,
path TEXT NOT NULL,
user TEXT NOT NULL,
success BOOLEAN NOT NULL,
file_size INTEGER,
error_message TEXT,
details TEXT
)
''')
conn.commit()
conn.close()
self.logger.info("데이터베이스 초기화 완료")
except Exception as e:
self.logger.error(f"데이터베이스 초기화 실패: {e}")
raise
def _setup_default_roots(self):
"""기본 Root 설정"""
try:
# 현재 작업 디렉토리
current_dir = os.getcwd()
default_root = RootConfig(
name="current_directory",
path=current_dir,
permission_level=PermissionLevel.READ_WRITE,
allowed_operations=[
FileOperation.READ,
FileOperation.WRITE,
FileOperation.CREATE,
FileOperation.DELETE,
FileOperation.MOVE,
FileOperation.SEARCH
],
max_file_size=50 * 1024 * 1024, # 50MB
audit_enabled=True
)
self.add_root(default_root)
# 홈 디렉토리 (읽기 전용)
home_dir = os.path.expanduser("~")
if home_dir != current_dir:
home_root = RootConfig(
name="home_directory",
path=home_dir,
permission_level=PermissionLevel.READ_ONLY,
allowed_operations=[
FileOperation.READ,
FileOperation.SEARCH
],
max_file_size=10 * 1024 * 1024, # 10MB
audit_enabled=True
)
self.add_root(home_root)
except Exception as e:
self.logger.warning(f"기본 Root 설정 실패: {e}")
def add_root(self, config: RootConfig) -> bool:
"""새로운 Root 경로 추가"""
try:
self.logger.info(f"Root 추가 시도: {config.name} -> {config.path}")
# 경로 존재 확인
if not os.path.exists(config.path):
self.logger.error(f"경로가 존재하지 않습니다: {config.path}")
return False
# 절대 경로로 변환
config.path = os.path.abspath(config.path)
# 권한 확인
if not os.access(config.path, os.R_OK):
self.logger.error(f"읽기 권한이 없습니다: {config.path}")
return False
with self.file_lock:
# 데이터베이스에 저장
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
INSERT OR REPLACE INTO roots
(name, path, permission_level, allowed_operations, config)
VALUES (?, ?, ?, ?, ?)
''', (
config.name,
config.path,
config.permission_level.value,
json.dumps([op.value for op in config.allowed_operations]),
json.dumps(config.to_dict())
))
conn.commit()
conn.close()
# 메모리에 저장
self.roots[config.name] = config
self.logger.info(f"Root '{config.name}' 추가 완료: {config.path}")
return True
except Exception as e:
self.logger.error(f"Root 추가 실패: {e}")
return False
def _calculate_file_hash(self, filepath: str) -> str:
"""파일 해시 계산"""
try:
hasher = hashlib.sha256()
with open(filepath, 'rb') as f:
for chunk in iter(lambda: f.read(4096), b""):
hasher.update(chunk)
return hasher.hexdigest()
except Exception as e:
self.logger.error(f"파일 해시 계산 실패 {filepath}: {e}")
return ""
def _get_file_metadata(self, filepath: str) -> Optional[FileMetadata]:
"""파일 메타데이터 수집"""
try:
path_obj = Path(filepath)
if not path_obj.exists():
return None
stat_info = path_obj.stat()
# 파일 해시 계산 (디렉토리가 아닌 경우만)
file_hash = ""
if path_obj.is_file():
file_hash = self._calculate_file_hash(filepath)
# MIME 타입 감지
mime_type, _ = mimetypes.guess_type(filepath)
if not mime_type:
mime_type = "application/octet-stream" if path_obj.is_file() else "inode/directory"
# 심볼릭 링크 대상
symlink_target = None
if path_obj.is_symlink():
try:
symlink_target = str(path_obj.readlink())
except:
symlink_target = None
return FileMetadata(
path=filepath,
name=path_obj.name,
size=stat_info.st_size,
modified_time=datetime.fromtimestamp(stat_info.st_mtime),
created_time=datetime.fromtimestamp(stat_info.st_ctime),
mime_type=mime_type,
permissions=oct(stat_info.st_mode)[-3:],
file_hash=file_hash,
is_directory=path_obj.is_dir(),
is_symlink=path_obj.is_symlink(),
symlink_target=symlink_target
)
except Exception as e:
self.logger.error(f"파일 메타데이터 수집 실패 {filepath}: {e}")
return None
def _log_audit_event(self, operation: FileOperation, path: str,
user: str, success: bool, file_size: Optional[int] = None,
error_message: Optional[str] = None, details: Optional[Dict] = None):
"""감사 이벤트 로깅"""
try:
event = AuditEvent(
timestamp=datetime.now(),
operation=operation,
path=path,
user=user,
success=success,
file_size=file_size,
error_message=error_message,
details=details
)
# 메모리에 저장 (제한된 크기)
self.audit_log.append(event)
if len(self.audit_log) > 1000: # 최대 1000개 유지
self.audit_log.pop(0)
# 데이터베이스에 저장
with self.file_lock:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
INSERT INTO audit_log
(timestamp, operation, path, user, success, file_size, error_message, details)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
''', (
event.timestamp,
event.operation.value,
event.path,
event.user,
event.success,
event.file_size,
event.error_message,
json.dumps(event.details) if event.details else None
))
conn.commit()
conn.close()
self.logger.debug(f"감사 이벤트 로깅: {operation.value} {path} {'성공' if success else '실패'}")
except Exception as e:
self.logger.error(f"감사 이벤트 로깅 실패: {e}")
def _check_permission(self, root_name: str, operation: FileOperation) -> bool:
"""권한 확인"""
if root_name not in self.roots:
self.logger.debug(f"Root '{root_name}' 찾을 수 없음")
return False
config = self.roots[root_name]
has_permission = operation in config.allowed_operations
self.logger.debug(f"권한 확인: {root_name} {operation.value} -> {has_permission}")
return has_permission
def _find_root_for_path(self, path: str) -> Optional[str]:
"""경로에 해당하는 Root 찾기"""
abs_path = os.path.abspath(path)
self.logger.debug(f"Root 찾기: {path} -> {abs_path}")
# 가장 구체적인 매치 찾기 (가장 긴 공통 경로)
best_match = None
best_match_length = 0
for root_name, config in self.roots.items():
self.logger.debug(f" 확인: {config.path} 하위인가?")
if abs_path.startswith(config.path):
match_length = len(config.path)
if match_length > best_match_length:
best_match = root_name
best_match_length = match_length
self.logger.debug(f" 새로운 최적 매치: {root_name} (길이: {match_length})")
if best_match:
self.logger.debug(f" 최종 매치: {best_match}")
else:
self.logger.debug(" 매치되는 Root 없음")
return best_match
def _check_file_size_limit(self, filepath: str, root_name: str) -> bool:
"""파일 크기 제한 확인"""
try:
if not os.path.exists(filepath):
return True # 새 파일은 크기 제한 없음
file_size = os.path.getsize(filepath)
max_size = self.roots[root_name].max_file_size
if file_size > max_size:
self.logger.warning(f"파일 크기 제한 초과: {file_size} > {max_size}")
return False
return True
except Exception as e:
self.logger.error(f"파일 크기 확인 실패: {e}")
return False
# ========================================
# 12개 핵심 기능 구현
# ========================================
def filesystem__read_file(self, path: str, encoding: str = 'utf-8', head: Optional[int] = None, tail: Optional[int] = None) -> Dict[str, Any]:
"""파일 읽기 (핵심 기능 1)"""
try:
self.logger.info(f"파일 읽기 시도: {path}")
# 절대 경로로 변환
abs_path = os.path.abspath(path)
# Root 찾기 및 권한 확인
root_name = self._find_root_for_path(abs_path)
if not root_name:
error_msg = f'허용되지 않은 경로: {abs_path}'
self._log_audit_event(FileOperation.READ, abs_path, os.getenv('USER', 'unknown'), False, error_message=error_msg)
return {'success': False, 'error': error_msg}
if not self._check_permission(root_name, FileOperation.READ):
error_msg = f'읽기 권한이 없습니다: {abs_path}'
self._log_audit_event(FileOperation.READ, abs_path, os.getenv('USER', 'unknown'), False, error_message=error_msg)
return {'success': False, 'error': error_msg}
# 파일 존재 확인
if not os.path.exists(abs_path):
error_msg = f'파일이 존재하지 않습니다: {abs_path}'
self._log_audit_event(FileOperation.READ, abs_path, os.getenv('USER', 'unknown'), False, error_message=error_msg)
return {'success': False, 'error': error_msg}
if not os.path.isfile(abs_path):
error_msg = f'디렉토리입니다: {abs_path}'
self._log_audit_event(FileOperation.READ, abs_path, os.getenv('USER', 'unknown'), False, error_message=error_msg)
return {'success': False, 'error': error_msg}
# 파일 크기 확인
if not self._check_file_size_limit(abs_path, root_name):
error_msg = f'파일 크기 제한 초과: {abs_path}'
self._log_audit_event(FileOperation.READ, abs_path, os.getenv('USER', 'unknown'), False, error_message=error_msg)
return {'success': False, 'error': error_msg}
# 파일 읽기
with self.file_lock:
with open(abs_path, 'r', encoding=encoding) as f:
if head and tail:
# head와 tail 둘 다 지정된 경우 에러
error_msg = 'head와 tail은 동시에 사용할 수 없습니다'
return {'success': False, 'error': error_msg}
elif head:
lines = []
for i, line in enumerate(f):
if i >= head:
break
lines.append(line)
content = ''.join(lines)
elif tail:
# 모든 라인을 읽고 마지막 N개만 가져오기
all_lines = f.readlines()
content = ''.join(all_lines[-tail:]) if len(all_lines) >= tail else ''.join(all_lines)
else:
content = f.read()
# 메타데이터 수집
metadata = self._get_file_metadata(abs_path)
# 감사 로그 기록
self._log_audit_event(
operation=FileOperation.READ,
path=abs_path,
user=os.getenv('USER', 'unknown'),
success=True,
file_size=len(content),
details={'encoding': encoding, 'head': head, 'tail': tail}
)
self.logger.info(f"파일 읽기 성공: {len(content)} 문자")
return {
'success': True,
'content': content,
'metadata': asdict(metadata) if metadata else None,
'encoding': encoding,
'path': abs_path
}
except Exception as e:
error_msg = f'파일 읽기 실패: {str(e)}'
self.logger.error(error_msg)
self._log_audit_event(FileOperation.READ, path, os.getenv('USER', 'unknown'), False, error_message=str(e))
return {'success': False, 'error': error_msg}
def read_multiple_files(self, paths: List[str], encoding: str = 'utf-8') -> Dict[str, Any]:
"""여러 파일 읽기 (핵심 기능 2)"""
try:
self.logger.info(f"여러 파일 읽기 시도: {len(paths)}개 파일")
results = {}
successful_reads = 0
for path in paths:
result = self.filesystem__read_file(path, encoding)
results[path] = result
if result['success']:
successful_reads += 1
return {
'success': True,
'results': results,
'total_files': len(paths),
'successful_reads': successful_reads,
'failed_reads': len(paths) - successful_reads
}
except Exception as e:
error_msg = f'여러 파일 읽기 실패: {str(e)}'
self.logger.error(error_msg)
return {'success': False, 'error': error_msg}
def filesystem__write_file(self, path: str, content: str, encoding: str = 'utf-8', create_dirs: bool = True) -> Dict[str, Any]:
"""파일 쓰기 (핵심 기능 3) - 원자적 쓰기"""
try:
self.logger.info(f"파일 쓰기 시도: {path}")
# 절대 경로로 변환
abs_path = os.path.abspath(path)
file_path = Path(abs_path)
# 디렉토리 생성 필요 시
if create_dirs and not file_path.parent.exists():
file_path.parent.mkdir(parents=True, exist_ok=True)
# Root 찾기 및 권한 확인
root_name = self._find_root_for_path(abs_path)
if not root_name:
error_msg = f'허용되지 않은 경로: {abs_path}'
self._log_audit_event(FileOperation.WRITE, abs_path, os.getenv('USER', 'unknown'), False, error_message=error_msg)
return {'success': False, 'error': error_msg}
operation = FileOperation.CREATE if not file_path.exists() else FileOperation.WRITE
if not self._check_permission(root_name, operation):
error_msg = f'쓰기 권한이 없습니다: {abs_path}'
self._log_audit_event(operation, abs_path, os.getenv('USER', 'unknown'), False, error_message=error_msg)
return {'success': False, 'error': error_msg}
# 내용 크기 확인
content_size = len(content.encode(encoding))
max_size = self.roots[root_name].max_file_size
if content_size > max_size:
error_msg = f'내용 크기가 제한을 초과합니다: {content_size} > {max_size}'
self._log_audit_event(operation, abs_path, os.getenv('USER', 'unknown'), False, error_message=error_msg)
return {'success': False, 'error': error_msg}
# 원자적 쓰기 (임시 파일 사용)
temp_path = file_path.with_suffix(file_path.suffix + '.tmp')
with self.file_lock:
# 임시 파일에 쓰기
with open(temp_path, 'w', encoding=encoding) as f:
f.write(content)
f.flush()
os.fsync(f.fileno()) # 디스크에 강제 쓰기
# 원자적 이동
shutil.move(temp_path, file_path)
# 메타데이터 수집
metadata = self._get_file_metadata(abs_path)
# 감사 로그 기록
self._log_audit_event(
operation=operation,
path=abs_path,
user=os.getenv('USER', 'unknown'),
success=True,
file_size=content_size,
details={'encoding': encoding, 'create_dirs': create_dirs}
)
self.logger.info(f"파일 쓰기 성공: {content_size} 바이트")
return {
'success': True,
'path': abs_path,
'size': content_size,
'metadata': asdict(metadata) if metadata else None,
'encoding': encoding
}
except Exception as e:
# 임시 파일 정리
temp_path = Path(path).with_suffix(Path(path).suffix + '.tmp')
if temp_path.exists():
try:
temp_path.unlink()
except:
pass
error_msg = f'파일 쓰기 실패: {str(e)}'
self.logger.error(error_msg)
self._log_audit_event(FileOperation.WRITE, path, os.getenv('USER', 'unknown'), False, error_message=str(e))
return {'success': False, 'error': error_msg}
def edit_file(self, path: str, old_text: str, new_text: str, encoding: str = 'utf-8') -> Dict[str, Any]:
"""파일 편집 (핵심 기능 4) - 텍스트 교체"""
try:
self.logger.info(f"파일 편집 시도: {path}")
# 절대 경로로 변환
abs_path = os.path.abspath(path)
# Root 찾기 및 권한 확인
root_name = self._find_root_for_path(abs_path)
if not root_name:
error_msg = f'허용되지 않은 경로: {abs_path}'
self._log_audit_event(FileOperation.WRITE, abs_path, os.getenv('USER', 'unknown'), False, error_message=error_msg)
return {'success': False, 'error': error_msg}
if not self._check_permission(root_name, FileOperation.WRITE):
error_msg = f'편집 권한이 없습니다: {abs_path}'
self._log_audit_event(FileOperation.WRITE, abs_path, os.getenv('USER', 'unknown'), False, error_message=error_msg)
return {'success': False, 'error': error_msg}
# 파일 존재 확인
if not os.path.exists(abs_path):
error_msg = f'파일이 존재하지 않습니다: {abs_path}'
self._log_audit_event(FileOperation.WRITE, abs_path, os.getenv('USER', 'unknown'), False, error_message=error_msg)
return {'success': False, 'error': error_msg}
# 파일 읽기
with self.file_lock:
with open(abs_path, 'r', encoding=encoding) as f:
original_content = f.read()
# 텍스트 교체
if old_text not in original_content:
error_msg = f'교체할 텍스트를 찾을 수 없습니다: {old_text[:50]}...'
self._log_audit_event(FileOperation.WRITE, abs_path, os.getenv('USER', 'unknown'), False, error_message=error_msg)
return {'success': False, 'error': error_msg}
new_content = original_content.replace(old_text, new_text)
changes_count = original_content.count(old_text)
# 변경사항이 있는지 확인
if new_content == original_content:
return {
'success': True,
'path': abs_path,
'changes_made': False,
'changes_count': 0,
'message': '변경사항이 없습니다'
}
# 크기 제한 확인
new_content_size = len(new_content.encode(encoding))
max_size = self.roots[root_name].max_file_size
if new_content_size > max_size:
error_msg = f'편집 후 파일 크기가 제한을 초과합니다: {new_content_size} > {max_size}'
self._log_audit_event(FileOperation.WRITE, abs_path, os.getenv('USER', 'unknown'), False, error_message=error_msg)
return {'success': False, 'error': error_msg}
# 원자적 쓰기
file_path = Path(abs_path)
temp_path = file_path.with_suffix(file_path.suffix + '.tmp')
with self.file_lock:
with open(temp_path, 'w', encoding=encoding) as f:
f.write(new_content)
f.flush()
os.fsync(f.fileno())
shutil.move(temp_path, file_path)
# 메타데이터 수집
metadata = self._get_file_metadata(abs_path)
# 감사 로그 기록
self._log_audit_event(
operation=FileOperation.WRITE,
path=abs_path,
user=os.getenv('USER', 'unknown'),
success=True,
file_size=new_content_size,
details={
'edit_type': 'text_replace',
'changes_count': changes_count,
'old_size': len(original_content),
'new_size': len(new_content)
}
)
self.logger.info(f"파일 편집 성공: {changes_count}곳 변경")
return {
'success': True,
'path': abs_path,
'changes_made': True,
'changes_count': changes_count,
'original_size': len(original_content),
'new_size': len(new_content),
'metadata': asdict(metadata) if metadata else None
}
except Exception as e:
# 임시 파일 정리
temp_path = Path(path).with_suffix(Path(path).suffix + '.tmp')
if temp_path.exists():
try:
temp_path.unlink()
except:
pass
error_msg = f'파일 편집 실패: {str(e)}'
self.logger.error(error_msg)
self._log_audit_event(FileOperation.WRITE, path, os.getenv('USER', 'unknown'), False, error_message=str(e))
return {'success': False, 'error': error_msg}
def create_directory(self, path: str, parents: bool = True, exist_ok: bool = True) -> Dict[str, Any]:
"""디렉토리 생성 (핵심 기능 5)"""
try:
self.logger.info(f"디렉토리 생성 시도: {path}")
# 절대 경로로 변환
abs_path = os.path.abspath(path)
dir_path = Path(abs_path)
# Root 찾기 및 권한 확인
root_name = self._find_root_for_path(abs_path)
if not root_name:
error_msg = f'허용되지 않은 경로: {abs_path}'
self._log_audit_event(FileOperation.CREATE, abs_path, os.getenv('USER', 'unknown'), False, error_message=error_msg)
return {'success': False, 'error': error_msg}
if not self._check_permission(root_name, FileOperation.CREATE):
error_msg = f'생성 권한이 없습니다: {abs_path}'
self._log_audit_event(FileOperation.CREATE, abs_path, os.getenv('USER', 'unknown'), False, error_message=error_msg)
return {'success': False, 'error': error_msg}
# 이미 존재하는지 확인
if dir_path.exists():
if dir_path.is_dir():
if exist_ok:
self.logger.info(f"디렉토리가 이미 존재함: {abs_path}")
metadata = self._get_file_metadata(abs_path)
return {
'success': True,
'path': abs_path,
'created': False,
'message': '디렉토리가 이미 존재합니다',
'metadata': asdict(metadata) if metadata else None
}
else:
error_msg = f'디렉토리가 이미 존재합니다: {abs_path}'
self._log_audit_event(FileOperation.CREATE, abs_path, os.getenv('USER', 'unknown'), False, error_message=error_msg)
return {'success': False, 'error': error_msg}
else:
error_msg = f'파일이 이미 존재합니다 (디렉토리가 아님): {abs_path}'
self._log_audit_event(FileOperation.CREATE, abs_path, os.getenv('USER', 'unknown'), False, error_message=error_msg)
return {'success': False, 'error': error_msg}
# 디렉토리 생성
with self.file_lock:
try:
dir_path.mkdir(parents=parents, exist_ok=exist_ok)
created = True
except FileExistsError:
if exist_ok:
created = False
else:
raise
except FileNotFoundError:
if not parents:
error_msg = f'상위 디렉토리가 존재하지 않습니다: {abs_path}'
self._log_audit_event(FileOperation.CREATE, abs_path, os.getenv('USER', 'unknown'), False, error_message=error_msg)
return {'success': False, 'error': error_msg}
else:
raise
# 메타데이터 수집
metadata = self._get_file_metadata(abs_path)
# 감사 로그 기록
self._log_audit_event(
operation=FileOperation.CREATE,
path=abs_path,
user=os.getenv('USER', 'unknown'),
success=True,
details={
'type': 'directory',
'parents': parents,
'exist_ok': exist_ok,
'created': created
}
)
self.logger.info(f"디렉토리 생성 성공: {abs_path}")
return {
'success': True,
'path': abs_path,
'created': created,
'metadata': asdict(metadata) if metadata else None
}
except PermissionError:
error_msg = f'디렉토리 생성 권한이 없습니다: {path}'
self.logger.error(error_msg)
self._log_audit_event(FileOperation.CREATE, path, os.getenv('USER', 'unknown'), False, error_message=error_msg)
return {'success': False, 'error': error_msg}
except Exception as e:
error_msg = f'디렉토리 생성 실패: {str(e)}'
self.logger.error(error_msg)
self._log_audit_event(FileOperation.CREATE, path, os.getenv('USER', 'unknown'), False, error_message=str(e))
return {'success': False, 'error': error_msg}
def filesystem__list_directory(self, path: str, show_hidden: bool = False) -> Dict[str, Any]:
"""디렉토리 목록 조회 (핵심 기능 6)"""
try:
self.logger.info(f"디렉토리 목록 조회: {path}")
# 절대 경로로 변환
abs_path = os.path.abspath(path)
# Root 찾기 및 권한 확인
root_name = self._find_root_for_path(abs_path)
if not root_name:
error_msg = f'허용되지 않은 경로: {abs_path}'
self._log_audit_event(FileOperation.READ, abs_path, os.getenv('USER', 'unknown'), False, error_message=error_msg)
return {'success': False, 'error': error_msg}
if not self._check_permission(root_name, FileOperation.READ):
error_msg = f'읽기 권한이 없습니다: {abs_path}'
self._log_audit_event(FileOperation.READ, abs_path, os.getenv('USER', 'unknown'), False, error_message=error_msg)
return {'success': False, 'error': error_msg}
# 디렉토리 존재 및 타입 확인
if not os.path.exists(abs_path):
error_msg = f'디렉토리가 존재하지 않습니다: {abs_path}'
self._log_audit_event(FileOperation.READ, abs_path, os.getenv('USER', 'unknown'), False, error_message=error_msg)
return {'success': False, 'error': error_msg}
if not os.path.isdir(abs_path):
error_msg = f'디렉토리가 아닙니다: {abs_path}'
self._log_audit_event(FileOperation.READ, abs_path, os.getenv('USER', 'unknown'), False, error_message=error_msg)
return {'success': False, 'error': error_msg}
# 디렉토리 내용 조회
items = []
with self.file_lock:
try:
for item_name in os.listdir(abs_path):
# 숨김 파일 처리
if not show_hidden and item_name.startswith('.'):
continue
item_path = os.path.join(abs_path, item_name)
try:
# 기본 정보
item_info = {
'name': item_name,
'path': item_path,
'type': '[DIR]' if os.path.isdir(item_path) else '[FILE]'
}
# 메타데이터 수집 (선택적)
metadata = self._get_file_metadata(item_path)
if metadata:
item_info.update({
'size': metadata.size,
'modified': metadata.modified_time.isoformat(),
'permissions': metadata.permissions,
'is_symlink': metadata.is_symlink
})
if metadata.is_symlink and metadata.symlink_target:
item_info['symlink_target'] = metadata.symlink_target
items.append(item_info)
except (PermissionError, OSError) as e:
# 개별 파일 접근 실패는 경고만 로그
self.logger.warning(f"항목 정보 수집 실패 {item_path}: {e}")
items.append({
'name': item_name,
'path': item_path,
'type': '[UNKNOWN]',
'error': str(e)
})
continue
except PermissionError:
error_msg = f'디렉토리 읽기 권한이 없습니다: {abs_path}'
self._log_audit_event(FileOperation.READ, abs_path, os.getenv('USER', 'unknown'), False, error_message=error_msg)
return {'success': False, 'error': error_msg}
# 정렬 (디렉토리 먼저, 그 다음 파일명순)
items.sort(key=lambda x: (x['type'] != '[DIR]', x['name'].lower()))
# 감사 로그 기록
self._log_audit_event(
operation=FileOperation.READ,
path=abs_path,
user=os.getenv('USER', 'unknown'),
success=True,
details={
'type': 'directory_listing',
'item_count': len(items),
'show_hidden': show_hidden
}
)
self.logger.info(f"디렉토리 목록 조회 성공: {len(items)}개 항목")
return {
'success': True,
'path': abs_path,
'items': items,
'count': len(items),
'show_hidden': show_hidden
}
except Exception as e:
error_msg = f'디렉토리 목록 조회 실패: {str(e)}'
self.logger.error(error_msg)
self._log_audit_event(FileOperation.READ, path, os.getenv('USER', 'unknown'), False, error_message=str(e))
return {'success': False, 'error': error_msg}
def list_directory_with_sizes(self, path: str, sort_by: str = 'name', show_hidden: bool = False) -> Dict[str, Any]:
"""크기를 포함한 디렉토리 목록 조회 (핵심 기능 7)"""
try:
self.logger.info(f"크기별 디렉토리 목록 조회: {path}")
# 기본 디렉토리 목록 조회
base_result = self.filesystem__list_directory(path, show_hidden)
if not base_result['success']:
return base_result
items = base_result['items']
# 크기 정보 추가 및 강화
enhanced_items = []
total_size = 0
for item in items:
enhanced_item = item.copy()
try:
item_path = item['path']
if item['type'] == '[DIR]':
# 디렉토리의 경우 하위 항목 개수 계산
try:
subitem_count = len([f for f in os.listdir(item_path)
if show_hidden or not f.startswith('.')])
enhanced_item['subitem_count'] = subitem_count
enhanced_item['size_display'] = f"{subitem_count} items"
except (PermissionError, OSError):
enhanced_item['subitem_count'] = 0
enhanced_item['size_display'] = "No access"
else:
# 파일의 경우 크기를 사람이 읽기 쉬운 형태로 변환
file_size = item.get('size', 0)
enhanced_item['size_display'] = self._format_file_size(file_size)
total_size += file_size
except Exception as e:
self.logger.warning(f"항목 크기 정보 수집 실패 {item['path']}: {e}")
enhanced_item['size_display'] = "Unknown"
enhanced_items.append(enhanced_item)
# 정렬
if sort_by == 'size':
# 크기순 정렬 (디렉토리는 하위 항목 수, 파일은 크기)
enhanced_items.sort(key=lambda x: (
x['type'] != '[FILE]', # 파일 우선
-(x.get('size', 0) if x['type'] == '[FILE]' else x.get('subitem_count', 0))
))
elif sort_by == 'name':
# 이름순 정렬 (기본)
enhanced_items.sort(key=lambda x: (x['type'] != '[DIR]', x['name'].lower()))
elif sort_by == 'modified':
# 수정일순 정렬
enhanced_items.sort(key=lambda x: x.get('modified', ''), reverse=True)
# 감사 로그 기록
self._log_audit_event(
operation=FileOperation.READ,
path=path,
user=os.getenv('USER', 'unknown'),
success=True,
details={
'type': 'directory_listing_with_sizes',
'sort_by': sort_by,
'total_size': total_size,
'item_count': len(enhanced_items)
}
)
self.logger.info(f"크기별 디렉토리 목록 조회 성공: {len(enhanced_items)}개 항목")
return {
'success': True,
'path': path,
'items': enhanced_items,
'count': len(enhanced_items),
'total_size': total_size,
'total_size_display': self._format_file_size(total_size),
'sort_by': sort_by,
'show_hidden': show_hidden
}
except Exception as e:
error_msg = f'크기별 디렉토리 목록 조회 실패: {str(e)}'
self.logger.error(error_msg)
return {'success': False, 'error': error_msg}
def _format_file_size(self, size_bytes: int) -> str:
"""파일 크기를 사람이 읽기 쉬운 형태로 변환"""
if size_bytes == 0:
return "0 B"
units = ['B', 'KB', 'MB', 'GB', 'TB']
unit_index = 0
size = float(size_bytes)
while size >= 1024 and unit_index < len(units) - 1:
size /= 1024
unit_index += 1
if unit_index == 0:
return f"{int(size)} {units[unit_index]}"
else:
return f"{size:.1f} {units[unit_index]}"
def directory_tree(self, path: str, max_depth: int = 10, include_hidden: bool = False) -> Dict[str, Any]:
"""디렉토리 트리 구조 조회 (핵심 기능 8)"""
try:
self.logger.info(f"디렉토리 트리 조회 시작: {path}")
# Roots 프로토콜 권한 확인
root_name = self._find_root_for_path(path)
if not root_name:
error_msg = f'허용되지 않은 경로: {path}'
self.logger.warning(error_msg)
self._log_audit_event(
operation=FileOperation.READ,
path=path,
user=os.getenv('USER', 'unknown'),
success=False,
error_message=error_msg
)
return {'success': False, 'error': error_msg}
# 읽기 권한 확인
if not self._check_permission(root_name, FileOperation.READ):
error_msg = f'읽기 권한이 없습니다: {path}'
self.logger.warning(error_msg)
self._log_audit_event(
operation=FileOperation.READ,
path=path,
user=os.getenv('USER', 'unknown'),
success=False,
error_message=error_msg
)
return {'success': False, 'error': error_msg}
# 경로 존재 확인
if not os.path.exists(path):
error_msg = f'경로가 존재하지 않습니다: {path}'
self.logger.warning(error_msg)
return {'success': False, 'error': error_msg}
# 디렉토리 확인
if not os.path.isdir(path):
error_msg = f'디렉토리가 아닙니다: {path}'
self.logger.warning(error_msg)
return {'success': False, 'error': error_msg}
# 재귀적으로 트리 구조 생성
tree_data = self._build_directory_tree(path, max_depth, include_hidden, current_depth=0)
# 감사 로그 기록
self._log_audit_event(
operation=FileOperation.READ,
path=path,
user=os.getenv('USER', 'unknown'),
success=True
)
self.logger.info(f"디렉토리 트리 조회 완료: {path}")
return {
'success': True,
'path': path,
'tree': tree_data,
'max_depth': max_depth,
'include_hidden': include_hidden
}
except Exception as e:
error_msg = f'디렉토리 트리 조회 실패: {str(e)}'
self.logger.error(error_msg)
self._log_audit_event(
operation=FileOperation.READ,
path=path,
user=os.getenv('USER', 'unknown'),
success=False,
error_message=str(e)
)
return {'success': False, 'error': error_msg}
def _build_directory_tree(self, path: str, max_depth: int, include_hidden: bool, current_depth: int = 0) -> Dict[str, Any]:
"""재귀적으로 디렉토리 트리 구조 생성"""
try:
tree_node = {
'name': os.path.basename(path),
'type': 'directory',
'path': path,
'children': []
}
# 최대 깊이 확인
if current_depth >= max_depth:
tree_node['truncated'] = True
return tree_node
# 디렉토리 내용 읽기
try:
entries = os.listdir(path)
except PermissionError:
tree_node['error'] = 'Permission denied'
return tree_node
# 정렬
entries.sort()
for entry in entries:
# 숨김 파일 처리
if not include_hidden and entry.startswith('.'):
continue
entry_path = os.path.join(path, entry)
try:
if os.path.isdir(entry_path):
# 하위 디렉토리 재귀 호출
child_tree = self._build_directory_tree(
entry_path, max_depth, include_hidden, current_depth + 1
)
tree_node['children'].append(child_tree)
else:
# 파일 노드 생성
file_node = {
'name': entry,
'type': 'file',
'path': entry_path
}
# 파일 크기 추가 (선택적)
try:
stat_info = os.stat(entry_path)
file_node['size'] = stat_info.st_size
file_node['size_display'] = self._format_file_size(stat_info.st_size)
file_node['modified'] = datetime.fromtimestamp(stat_info.st_mtime).isoformat()
except (OSError, IOError):
pass
tree_node['children'].append(file_node)
except (OSError, IOError) as e:
# 접근할 수 없는 항목은 오류 노드로 추가
error_node = {
'name': entry,
'type': 'error',
'path': entry_path,
'error': str(e)
}
tree_node['children'].append(error_node)
# 통계 정보 추가
total_files = sum(1 for child in tree_node['children'] if child['type'] == 'file')
total_dirs = sum(1 for child in tree_node['children'] if child['type'] == 'directory')
tree_node['stats'] = {
'total_files': total_files,
'total_directories': total_dirs,
'total_items': len(tree_node['children'])
}
return tree_node
except Exception as e:
self.logger.error(f"트리 구조 생성 실패 {path}: {e}")
return {
'name': os.path.basename(path),
'type': 'error',
'path': path,
'error': str(e)
}
def move_file(self, source: str, destination: str, overwrite: bool = False) -> Dict[str, Any]:
"""파일 이동/이름 변경 (핵심 기능 9)"""
try:
self.logger.info(f"파일 이동 시도: {source} -> {destination}")
# 소스 경로 Roots 프로토콜 권한 확인
source_root = self._find_root_for_path(source)
if not source_root:
error_msg = f'소스 경로가 허용되지 않음: {source}'
self.logger.warning(error_msg)
self._log_audit_event(
operation=FileOperation.DELETE, # 이동은 삭제+생성으로 간주
path=source,
user=os.getenv('USER', 'unknown'),
success=False,
error_message=error_msg
)
return {'success': False, 'error': error_msg}
# 대상 경로 Roots 프로토콜 권한 확인
dest_root = self._find_root_for_path(destination)
if not dest_root:
error_msg = f'대상 경로가 허용되지 않음: {destination}'
self.logger.warning(error_msg)
return {'success': False, 'error': error_msg}
# 소스 경로 삭제 권한 확인
if not self._check_permission(source_root, FileOperation.DELETE):
error_msg = f'소스 경로 삭제 권한이 없습니다: {source}'
self.logger.warning(error_msg)
return {'success': False, 'error': error_msg}
# 대상 경로 생성 권한 확인
if not self._check_permission(dest_root, FileOperation.CREATE):
error_msg = f'대상 경로 생성 권한이 없습니다: {destination}'
self.logger.warning(error_msg)
return {'success': False, 'error': error_msg}
# 소스 파일/디렉토리 존재 확인
if not os.path.exists(source):
error_msg = f'소스가 존재하지 않습니다: {source}'
self.logger.warning(error_msg)
return {'success': False, 'error': error_msg}
# 대상 경로 처리
destination = os.path.abspath(destination)
dest_dir = os.path.dirname(destination)
# 대상 디렉토리가 존재하지 않으면 생성
if not os.path.exists(dest_dir):
try:
os.makedirs(dest_dir, exist_ok=True)
self.logger.info(f"대상 디렉토리 생성: {dest_dir}")
except Exception as e:
error_msg = f'대상 디렉토리 생성 실패: {str(e)}'
self.logger.error(error_msg)
return {'success': False, 'error': error_msg}
# 대상 파일이 이미 존재하는 경우 처리
if os.path.exists(destination):
if not overwrite:
error_msg = f'대상 파일이 이미 존재합니다 (overwrite=False): {destination}'
self.logger.warning(error_msg)
return {'success': False, 'error': error_msg}
else:
self.logger.info(f"기존 대상 파일 덮어쓰기: {destination}")
# 소스와 대상이 같은 경우 확인
if os.path.abspath(source) == destination:
error_msg = f'소스와 대상이 동일합니다: {source}'
self.logger.warning(error_msg)
return {'success': False, 'error': error_msg}
# 메타데이터 수집 (이동 전)
source_metadata = self._get_file_metadata(source)
is_directory = os.path.isdir(source)
# 원자적 이동 수행
import shutil
try:
if is_directory:
# 디렉토리 이동
if os.path.exists(destination) and overwrite:
shutil.rmtree(destination)
shutil.move(source, destination)
operation_type = "directory_move"
else:
# 파일 이동
shutil.move(source, destination)
operation_type = "file_move"
# 대상 메타데이터 수집 (이동 후)
dest_metadata = self._get_file_metadata(destination)
# 감사 로그 기록 (성공)
self._log_audit_event(
operation=FileOperation.DELETE, # 소스에서 삭제
path=source,
user=os.getenv('USER', 'unknown'),
success=True
)
self._log_audit_event(
operation=FileOperation.CREATE, # 대상에서 생성
path=destination,
user=os.getenv('USER', 'unknown'),
success=True
)
self.logger.info(f"파일 이동 성공: {source} -> {destination}")
return {
'success': True,
'source': source,
'destination': destination,
'operation_type': operation_type,
'overwrite_occurred': os.path.exists(destination) and overwrite,
'source_metadata': asdict(source_metadata) if source_metadata else None,
'destination_metadata': asdict(dest_metadata) if dest_metadata else None
}
except Exception as e:
error_msg = f'파일 이동 실패: {str(e)}'
self.logger.error(error_msg)
# 감사 로그 기록 (실패)
self._log_audit_event(
operation=FileOperation.DELETE,
path=source,
user=os.getenv('USER', 'unknown'),
success=False,
error_message=str(e)
)
return {'success': False, 'error': error_msg}
except Exception as e:
error_msg = f'파일 이동 처리 실패: {str(e)}'
self.logger.error(error_msg)
return {'success': False, 'error': error_msg}
def search_files(self, path: str, pattern: str, excludePatterns: Optional[List[str]] = None) -> Dict[str, Any]:
"""파일 검색 (핵심 기능 10)"""
try:
self.logger.info(f"파일 검색 시도: {path}, 패턴: {pattern}")
# Roots 프로토콜 권한 확인
root_info = self._find_root_for_path(path)
if not root_info:
error_msg = f'경로가 허용되지 않음: {path}'
self.logger.warning(error_msg)
return {'success': False, 'error': error_msg}
# 읽기 권한 확인
if not self._check_permission(root_info, FileOperation.READ):
error_msg = f'읽기 권한이 없습니다: {path}'
self.logger.warning(error_msg)
return {'success': False, 'error': error_msg}
# 경로 존재 확인
if not os.path.exists(path):
error_msg = f'경로가 존재하지 않습니다: {path}'
self.logger.warning(error_msg)
return {'success': False, 'error': error_msg}
# 디렉토리 확인
if not os.path.isdir(path):
error_msg = f'검색 경로는 디렉토리여야 합니다: {path}'
self.logger.warning(error_msg)
return {'success': False, 'error': error_msg}
# 제외 패턴 기본값 설정
if excludePatterns is None:
excludePatterns = ['.*', '__pycache__', '*.pyc', '*.pyo', 'node_modules', '.git']
found_files = []
found_directories = []
# 재귀적 검색
for root, dirs, files in os.walk(path):
try:
# 제외 패턴에 따라 디렉토리 필터링
dirs[:] = [d for d in dirs if not any(fnmatch.fnmatch(d, pattern) for pattern in excludePatterns)]
# 파일 검색
for file in files:
# 제외 패턴 확인
if any(fnmatch.fnmatch(file, exc_pattern) for exc_pattern in excludePatterns):
continue
# 패턴 매치 확인 (대소문자 무시)
if fnmatch.fnmatch(file.lower(), pattern.lower()):
file_path = os.path.join(root, file)
relative_path = os.path.relpath(file_path, path)
# 파일 메타데이터 수집
try:
file_stat = os.stat(file_path)
found_files.append({
'path': file_path,
'relative_path': relative_path,
'name': file,
'size': file_stat.st_size,
'modified': datetime.fromtimestamp(file_stat.st_mtime).isoformat(),
'type': 'file'
})
except OSError:
# 권한 부족이나 삭제된 파일 등의 경우
continue
# 디렉토리 검색
for dir_name in dirs:
if fnmatch.fnmatch(dir_name.lower(), pattern.lower()):
dir_path = os.path.join(root, dir_name)
relative_path = os.path.relpath(dir_path, path)
# 디렉토리 메타데이터 수집
try:
dir_stat = os.stat(dir_path)
found_directories.append({
'path': dir_path,
'relative_path': relative_path,
'name': dir_name,
'modified': datetime.fromtimestamp(dir_stat.st_mtime).isoformat(),
'type': 'directory'
})
except OSError:
continue
except PermissionError:
# 권한이 없는 디렉토리는 건너뛰기
continue
# 결과 정렬 (이름순)
found_files.sort(key=lambda x: x['name'].lower())
found_directories.sort(key=lambda x: x['name'].lower())
self.logger.info(f"파일 검색 완료: {len(found_files)}개 파일, {len(found_directories)}개 디렉토리")
return {
'success': True,
'search_path': path,
'pattern': pattern,
'exclude_patterns': excludePatterns,
'files': found_files,
'directories': found_directories,
'total_files': len(found_files),
'total_directories': len(found_directories)
}
except Exception as e:
error_msg = f'파일 검색 실패: {str(e)}'
self.logger.error(error_msg)
return {'success': False, 'error': error_msg}
def get_file_info(self, path: str) -> Dict[str, Any]:
"""파일 정보 조회 (핵심 기능 11)"""
try:
self.logger.info(f"파일 정보 조회: {path}")
# Roots 프로토콜 권한 확인
root_info = self._find_root_for_path(path)
if not root_info:
error_msg = f'경로가 허용되지 않음: {path}'
self.logger.warning(error_msg)
return {'success': False, 'error': error_msg}
# 읽기 권한 확인
if not self._check_permission(root_info, FileOperation.READ):
error_msg = f'읽기 권한이 없습니다: {path}'
self.logger.warning(error_msg)
return {'success': False, 'error': error_msg}
# 경로 존재 확인
if not os.path.exists(path):
error_msg = f'경로가 존재하지 않습니다: {path}'
self.logger.warning(error_msg)
return {'success': False, 'error': error_msg}
# 파일 통계 정보 수집
stat_info = os.stat(path)
# 기본 정보
file_info = {
'success': True,
'path': path,
'name': os.path.basename(path),
'absolute_path': os.path.abspath(path),
'exists': True,
'type': 'directory' if os.path.isdir(path) else 'file',
'size': stat_info.st_size,
'created': datetime.fromtimestamp(stat_info.st_ctime).isoformat(),
'modified': datetime.fromtimestamp(stat_info.st_mtime).isoformat(),
'accessed': datetime.fromtimestamp(stat_info.st_atime).isoformat(),
'permissions': {
'readable': os.access(path, os.R_OK),
'writable': os.access(path, os.W_OK),
'executable': os.access(path, os.X_OK),
'mode': oct(stat_info.st_mode)[-3:] # rwx permissions
},
'owner': {
'uid': stat_info.st_uid,
'gid': stat_info.st_gid
}
}
# 파일별 추가 정보
if os.path.isfile(path):
# MIME 타입 추정
mime_type, encoding = mimetypes.guess_type(path)
file_info['mime_type'] = mime_type
file_info['encoding'] = encoding
# 파일 확장자
file_info['extension'] = os.path.splitext(path)[1].lower()
# 해시 계산 (작은 파일만)
if stat_info.st_size < 1024 * 1024: # 1MB 미만
try:
with open(path, 'rb') as f:
content = f.read()
file_info['checksums'] = {
'md5': hashlib.md5(content).hexdigest(),
'sha256': hashlib.sha256(content).hexdigest()
}
except Exception:
file_info['checksums'] = None
else:
file_info['checksums'] = None
# 텍스트 파일 여부 확인
try:
with open(path, 'r', encoding='utf-8') as f:
f.read(100) # 첫 100문자만 읽어서 텍스트 파일인지 확인
file_info['is_text'] = True
except (UnicodeDecodeError, PermissionError):
file_info['is_text'] = False
elif os.path.isdir(path):
# 디렉토리 내용 개수
try:
contents = os.listdir(path)
files = [f for f in contents if os.path.isfile(os.path.join(path, f))]
dirs = [d for d in contents if os.path.isdir(os.path.join(path, d))]
file_info['directory_info'] = {
'total_items': len(contents),
'file_count': len(files),
'directory_count': len(dirs),
'is_empty': len(contents) == 0
}
except PermissionError:
file_info['directory_info'] = {'error': 'Permission denied'}
# 심볼릭 링크 정보
if os.path.islink(path):
file_info['is_symlink'] = True
file_info['symlink_target'] = os.readlink(path)
file_info['symlink_target_exists'] = os.path.exists(os.readlink(path))
else:
file_info['is_symlink'] = False
self.logger.info(f"파일 정보 조회 완료: {path}")
return file_info
except Exception as e:
error_msg = f'파일 정보 조회 실패: {str(e)}'
self.logger.error(error_msg)
return {'success': False, 'error': error_msg, 'path': path}
def list_allowed_directories(self) -> Dict[str, Any]:
"""허용된 디렉토리 목록 조회 (핵심 기능 12)"""
try:
self.logger.info("허용된 디렉토리 목록 조회")
# Roots 정보를 기반으로 허용된 디렉토리 수집
allowed_dirs = []
for root_name, root_config in self.roots.items():
root_path = root_config.path
permission_level = root_config.permission_level
# 경로 존재 확인
exists = os.path.exists(root_path)
accessible = False
if exists:
try:
# 접근 가능성 확인
accessible = os.access(root_path, os.R_OK)
# 디렉토리 정보 수집
stat_info = os.stat(root_path)
dir_info = {
'root_name': root_name,
'path': root_path,
'name': os.path.basename(root_path) or root_path,
'absolute_path': os.path.abspath(root_path),
'permission_level': permission_level.value,
'exists': True,
'accessible': accessible,
'is_directory': os.path.isdir(root_path),
'created': datetime.fromtimestamp(stat_info.st_ctime).isoformat(),
'modified': datetime.fromtimestamp(stat_info.st_mtime).isoformat(),
'permissions': {
'readable': os.access(root_path, os.R_OK),
'writable': os.access(root_path, os.W_OK),
'executable': os.access(root_path, os.X_OK)
}
}
# 디렉토리인 경우 내용 개수 확인
if os.path.isdir(root_path):
try:
contents = os.listdir(root_path)
dir_info['item_count'] = len(contents)
except PermissionError:
dir_info['item_count'] = None
else:
dir_info['item_count'] = None
except OSError as e:
dir_info = {
'root_name': root_name,
'path': root_path,
'name': os.path.basename(root_path) or root_path,
'permission_level': permission_level.value,
'exists': True,
'accessible': False,
'error': f'통계 정보 수집 실패: {str(e)}'
}
else:
dir_info = {
'root_name': root_name,
'path': root_path,
'name': os.path.basename(root_path) or root_path,
'permission_level': permission_level.value,
'exists': False,
'accessible': False,
'error': '경로가 존재하지 않음'
}
allowed_dirs.append(dir_info)
# 권한 레벨별 통계
permission_stats = {}
for permission_level in PermissionLevel:
count = sum(1 for d in allowed_dirs if d.get('permission_level') == permission_level.value)
permission_stats[permission_level.value] = count
# 접근 가능한 디렉토리 통계
accessible_count = sum(1 for d in allowed_dirs if d.get('accessible', False))
existing_count = sum(1 for d in allowed_dirs if d.get('exists', False))
self.logger.info(f"허용된 디렉토리 목록 조회 완료: 총 {len(allowed_dirs)}개")
return {
'success': True,
'total_directories': len(allowed_dirs),
'accessible_directories': accessible_count,
'existing_directories': existing_count,
'permission_statistics': permission_stats,
'directories': allowed_dirs,
'timestamp': datetime.now().isoformat()
}
except Exception as e:
error_msg = f'허용된 디렉토리 목록 조회 실패: {str(e)}'
self.logger.error(error_msg)
return {'success': False, 'error': error_msg}
def cleanup(self):
"""리소스 정리"""
try:
if os.path.exists(self.db_path):
os.remove(self.db_path)
self.logger.info("CompleteFilesystemMCP 정리 완료")
except Exception as e:
self.logger.error(f"리소스 정리 실패: {e}")
def test_complete_filesystem_mcp():
"""테스트 함수"""
print("=== Complete Filesystem MCP 테스트 시작 ===")
# MCP 인스턴스 생성
fs_mcp = CompleteFilesystemMCP()
try:
# 테스트 파일 경로
test_file = "/home/skyki/qwen2.5/test_file.txt"
test_dir = "/home/skyki/qwen2.5/test_directory"
# 1. 파일 쓰기 테스트
print("\n=== 파일 쓰기 테스트 ===")
write_result = fs_mcp.filesystem__write_file(test_file, "Hello, Complete Filesystem MCP!")
print(f"쓰기 성공: {write_result['success']}")
if write_result['success']:
print(f"파일 크기: {write_result['size']} 바이트")
# 2. 파일 읽기 테스트
print("\n=== 파일 읽기 테스트 ===")
read_result = fs_mcp.filesystem__read_file(test_file)
print(f"읽기 성공: {read_result['success']}")
if read_result['success']:
print(f"내용: {read_result['content'][:50]}...")
# 3. 파일 편집 테스트
print("\n=== 파일 편집 테스트 ===")
edit_result = fs_mcp.edit_file(test_file, "Hello", "Hi")
print(f"편집 성공: {edit_result['success']}")
if edit_result['success']:
print(f"변경 횟수: {edit_result['changes_count']}")
# 4. 디렉토리 생성 테스트
print("\n=== 디렉토리 생성 테스트 ===")
dir_result = fs_mcp.create_directory(test_dir)
print(f"디렉토리 생성 성공: {dir_result['success']}")
# 5. 여러 파일 읽기 테스트
print("\n=== 여러 파일 읽기 테스트 ===")
multi_result = fs_mcp.read_multiple_files([test_file, "/home/skyki/qwen2.5/project_plan.md"])
print(f"여러 파일 읽기 성공: {multi_result['successful_reads']}/{multi_result['total_files']}")
# 6. 디렉토리 목록 조회 (기본)
print("\n=== 디렉토리 목록 조회 테스트 ===")
list_result = fs_mcp.filesystem__list_directory("/home/skyki/qwen2.5")
print(f"목록 조회 성공: {list_result['success']}")
if list_result['success']:
print(f"항목 수: {list_result['count']}")
print(f"첫 5개 항목: {[item['name'] for item in list_result['items'][:5]]}")
# 7. 크기별 디렉토리 목록 조회
print("\n=== 크기별 디렉토리 목록 조회 테스트 ===")
size_list_result = fs_mcp.list_directory_with_sizes("/home/skyki/qwen2.5", sort_by="size")
print(f"크기별 목록 조회 성공: {size_list_result['success']}")
if size_list_result['success']:
print(f"총 항목 수: {size_list_result['count']}")
print(f"총 크기: {size_list_result['total_size_display']}")
print("크기 기준 상위 3개 항목:")
for item in size_list_result['items'][:3]:
print(f" - {item['name']}: {item.get('size_display', 'N/A')}")
# 8. 디렉토리 트리 구조 조회
print("\n=== 디렉토리 트리 구조 조회 테스트 ===")
tree_result = fs_mcp.directory_tree("/home/skyki/qwen2.5", max_depth=2, include_hidden=False)
print(f"트리 조회 성공: {tree_result['success']}")
if tree_result['success']:
tree_data = tree_result['tree']
print(f"루트 디렉토리: {tree_data['name']}")
print(f"직접 하위 항목 수: {tree_data['stats']['total_items']}")
print(f"파일 수: {tree_data['stats']['total_files']}")
print(f"디렉토리 수: {tree_data['stats']['total_directories']}")
# 첫 번째 레벨의 몇 개 항목 표시
print("첫 번째 레벨 항목 예시:")
for child in tree_data['children'][:3]:
print(f" - {child['name']} ({child['type']})")
# 9. 파일 이동/이름 변경 테스트
print("\n=== 파일 이동/이름 변경 테스트 ===")
move_source = "/home/skyki/qwen2.5/test_file.txt"
move_dest = "/home/skyki/qwen2.5/test_directory/moved_file.txt"
move_result = fs_mcp.move_file(move_source, move_dest, overwrite=True)
print(f"파일 이동 성공: {move_result['success']}")
if move_result['success']:
print(f"이동: {move_result['source']} -> {move_result['destination']}")
print(f"작업 유형: {move_result['operation_type']}")
print(f"덮어쓰기 발생: {move_result['overwrite_occurred']}")
# 이동된 파일 확인
if os.path.exists(move_dest):
print(f"이동된 파일 존재 확인: ✅")
if not os.path.exists(move_source):
print(f"원본 파일 삭제 확인: ✅")
# Root 정보 확인
print("\n=== Root 정보 ===")
print(f"등록된 Root 수: {len(fs_mcp.roots)}")
for name, config in fs_mcp.roots.items():
print(f" - {name}: {config.path} ({config.permission_level.value})")
# 정리
if os.path.exists(test_file):
os.remove(test_file)
# 이동된 파일 정리
moved_file = "/home/skyki/qwen2.5/test_directory/moved_file.txt"
if os.path.exists(moved_file):
os.remove(moved_file)
if os.path.exists(test_dir):
try:
os.rmdir(test_dir)
except OSError:
# 디렉토리가 비어있지 않은 경우
import shutil
shutil.rmtree(test_dir)
finally:
fs_mcp.cleanup()
print("\n=== 테스트 완료 ===")
if __name__ == "__main__":
test_complete_filesystem_mcp()