#!/usr/bin/env python3
"""
Filesystem MCP Base - 기본 클래스와 유틸리티
Complete Filesystem MCP의 기본 클래스들과 유틸리티 함수들을 포함합니다.
- Enum 클래스들 (PermissionLevel, FileOperation)
- 데이터 클래스들 (RootConfig, FileMetadata, AuditEvent)
- CompleteFilesystemMCP 초기화 및 내부 유틸리티
"""
import os
import sys
import sqlite3
import json
import hashlib
import mimetypes
import tempfile
import shutil
import threading
import time
import logging
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional, Any, Tuple, Union
from dataclasses import dataclass, asdict
from enum import Enum
import glob
import fnmatch
# 로깅 설정
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class PermissionLevel(Enum):
"""권한 레벨 정의"""
READ_ONLY = "read_only"
READ_WRITE = "read_write"
FULL_ACCESS = "full_access"
RESTRICTED = "restricted"
class FileOperation(Enum):
"""파일 작업 타입"""
READ = "read"
WRITE = "write"
DELETE = "delete"
CREATE = "create"
MOVE = "move"
SEARCH = "search"
@dataclass
class RootConfig:
"""Roots 프로토콜 설정"""
name: str
path: str
permission_level: PermissionLevel
allowed_operations: List[FileOperation]
max_file_size: int = 100 * 1024 * 1024 # 100MB
audit_enabled: bool = True
def to_dict(self):
"""딕셔너리로 변환"""
return {
'name': self.name,
'path': self.path,
'permission_level': self.permission_level.value,
'allowed_operations': [op.value for op in self.allowed_operations],
'max_file_size': self.max_file_size,
'audit_enabled': self.audit_enabled
}
@dataclass
class FileMetadata:
"""파일 메타데이터"""
path: str
name: str
size: int
modified_time: datetime
created_time: datetime
mime_type: str
permissions: str
file_hash: str
is_directory: bool
is_symlink: bool
symlink_target: Optional[str] = None
@dataclass
class AuditEvent:
"""감사 이벤트"""
timestamp: datetime
operation: FileOperation
path: str
user: str
success: bool
file_size: Optional[int] = None
error_message: Optional[str] = None
details: Optional[Dict] = None
class FilesystemMCPBase:
"""파일시스템 MCP 기본 클래스"""
def __init__(self, db_path: Optional[str] = None):
"""초기화"""
# 로거 먼저 설정
self.logger = logging.getLogger("CompleteFilesystemMCP")
# DB 경로 설정
self.db_path = db_path or tempfile.mktemp(suffix='.db')
# Roots 설정
self.roots: Dict[str, RootConfig] = {}
# 감사 로그 저장소
self.audit_log: List[AuditEvent] = []
# 스레드 안전성을 위한 락
self.file_lock = threading.RLock()
# GPU 환경변수 설정 (CLAUDE.md 지침)
self._setup_gpu_environment()
# 데이터베이스 초기화
self._init_database()
# 기본 루트 추가
self._setup_default_roots()
self.logger.info(f"CompleteFilesystemMCP 초기화 완료: DB={self.db_path}")
def _setup_gpu_environment(self):
"""GPU 환경변수 설정 (CLAUDE.md 지침 반영)"""
try:
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2' # 경고 숨김
os.environ['TF_FORCE_GPU_ALLOW_GROWTH'] = 'true' # 메모리 증분 할당
os.environ['CUDA_VISIBLE_DEVICES'] = '0' # GPU 0 사용
os.environ['TF_GPU_ALLOCATOR'] = 'cuda_malloc_async'
self.logger.debug("GPU 환경변수 설정 완료")
except Exception as e:
self.logger.warning(f"GPU 환경변수 설정 실패: {e}")
def _init_database(self):
"""데이터베이스 초기화"""
try:
with self.file_lock:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Roots 테이블
cursor.execute('''
CREATE TABLE IF NOT EXISTS roots (
name TEXT PRIMARY KEY,
path TEXT NOT NULL,
permission_level TEXT NOT NULL,
allowed_operations TEXT NOT NULL,
config TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# 파일 메타데이터 테이블
cursor.execute('''
CREATE TABLE IF NOT EXISTS file_metadata (
path TEXT PRIMARY KEY,
name TEXT NOT NULL,
size INTEGER NOT NULL,
modified_time TIMESTAMP NOT NULL,
created_time TIMESTAMP NOT NULL,
mime_type TEXT NOT NULL,
permissions TEXT NOT NULL,
file_hash TEXT NOT NULL,
is_directory BOOLEAN NOT NULL,
is_symlink BOOLEAN NOT NULL,
symlink_target TEXT,
last_accessed TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# 감사 로그 테이블
cursor.execute('''
CREATE TABLE IF NOT EXISTS audit_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TIMESTAMP NOT NULL,
operation TEXT NOT NULL,
path TEXT NOT NULL,
user TEXT NOT NULL,
success BOOLEAN NOT NULL,
file_size INTEGER,
error_message TEXT,
details TEXT
)
''')
conn.commit()
conn.close()
self.logger.info("데이터베이스 초기화 완료")
except Exception as e:
self.logger.error(f"데이터베이스 초기화 실패: {e}")
raise
def _setup_default_roots(self):
"""기본 Root 설정"""
try:
# 현재 작업 디렉토리
current_dir = os.getcwd()
default_root = RootConfig(
name="current_directory",
path=current_dir,
permission_level=PermissionLevel.READ_WRITE,
allowed_operations=[
FileOperation.READ,
FileOperation.WRITE,
FileOperation.CREATE,
FileOperation.DELETE,
FileOperation.MOVE,
FileOperation.SEARCH
],
max_file_size=50 * 1024 * 1024, # 50MB
audit_enabled=True
)
self.add_root(default_root)
# 홈 디렉토리 (읽기 전용)
home_dir = os.path.expanduser("~")
if home_dir != current_dir:
home_root = RootConfig(
name="home_directory",
path=home_dir,
permission_level=PermissionLevel.READ_ONLY,
allowed_operations=[
FileOperation.READ,
FileOperation.SEARCH
],
max_file_size=10 * 1024 * 1024, # 10MB
audit_enabled=True
)
self.add_root(home_root)
except Exception as e:
self.logger.warning(f"기본 Root 설정 실패: {e}")
def add_root(self, config: RootConfig) -> bool:
"""새로운 Root 경로 추가"""
try:
self.logger.info(f"Root 추가 시도: {config.name} -> {config.path}")
# 경로 존재 확인
if not os.path.exists(config.path):
self.logger.error(f"경로가 존재하지 않습니다: {config.path}")
return False
# 절대 경로로 변환
config.path = os.path.abspath(config.path)
# 권한 확인
if not os.access(config.path, os.R_OK):
self.logger.error(f"읽기 권한이 없습니다: {config.path}")
return False
with self.file_lock:
# 데이터베이스에 저장
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
INSERT OR REPLACE INTO roots
(name, path, permission_level, allowed_operations, config)
VALUES (?, ?, ?, ?, ?)
''', (
config.name,
config.path,
config.permission_level.value,
json.dumps([op.value for op in config.allowed_operations]),
json.dumps(config.to_dict())
))
conn.commit()
conn.close()
# 메모리에 저장
self.roots[config.name] = config
self.logger.info(f"Root '{config.name}' 추가 완료: {config.path}")
return True
except Exception as e:
self.logger.error(f"Root 추가 실패: {e}")
return False
def _calculate_file_hash(self, filepath: str) -> str:
"""파일 해시 계산"""
try:
hasher = hashlib.sha256()
with open(filepath, 'rb') as f:
for chunk in iter(lambda: f.read(4096), b""):
hasher.update(chunk)
return hasher.hexdigest()
except Exception as e:
self.logger.error(f"파일 해시 계산 실패 {filepath}: {e}")
return ""
def _get_file_metadata(self, filepath: str) -> Optional[FileMetadata]:
"""파일 메타데이터 수집"""
try:
path_obj = Path(filepath)
if not path_obj.exists():
return None
stat_info = path_obj.stat()
# 파일 해시 계산 (디렉토리가 아닌 경우만)
file_hash = ""
if path_obj.is_file():
file_hash = self._calculate_file_hash(filepath)
# MIME 타입 감지
mime_type, _ = mimetypes.guess_type(filepath)
if not mime_type:
mime_type = "application/octet-stream" if path_obj.is_file() else "inode/directory"
# 심볼릭 링크 대상
symlink_target = None
if path_obj.is_symlink():
try:
symlink_target = str(path_obj.readlink())
except:
symlink_target = None
return FileMetadata(
path=filepath,
name=path_obj.name,
size=stat_info.st_size,
modified_time=datetime.fromtimestamp(stat_info.st_mtime),
created_time=datetime.fromtimestamp(stat_info.st_ctime),
mime_type=mime_type,
permissions=oct(stat_info.st_mode)[-3:],
file_hash=file_hash,
is_directory=path_obj.is_dir(),
is_symlink=path_obj.is_symlink(),
symlink_target=symlink_target
)
except Exception as e:
self.logger.error(f"파일 메타데이터 수집 실패 {filepath}: {e}")
return None
def _log_audit_event(self, operation: FileOperation, path: str,
user: str, success: bool, file_size: Optional[int] = None,
error_message: Optional[str] = None, details: Optional[Dict] = None):
"""감사 이벤트 로깅"""
try:
event = AuditEvent(
timestamp=datetime.now(),
operation=operation,
path=path,
user=user,
success=success,
file_size=file_size,
error_message=error_message,
details=details
)
# 메모리에 저장 (제한된 크기)
self.audit_log.append(event)
if len(self.audit_log) > 1000: # 최대 1000개 유지
self.audit_log.pop(0)
# 데이터베이스에 저장
with self.file_lock:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
INSERT INTO audit_log
(timestamp, operation, path, user, success, file_size, error_message, details)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
''', (
event.timestamp,
event.operation.value,
event.path,
event.user,
event.success,
event.file_size,
event.error_message,
json.dumps(event.details) if event.details else None
))
conn.commit()
conn.close()
self.logger.debug(f"감사 이벤트 로깅: {operation.value} {path} {'성공' if success else '실패'}")
except Exception as e:
self.logger.error(f"감사 이벤트 로깅 실패: {e}")
def _check_permission(self, root_name: str, operation: FileOperation) -> bool:
"""권한 확인"""
if root_name not in self.roots:
self.logger.debug(f"Root '{root_name}' 찾을 수 없음")
return False
config = self.roots[root_name]
has_permission = operation in config.allowed_operations
self.logger.debug(f"권한 확인: {root_name} {operation.value} -> {has_permission}")
return has_permission
def _find_root_for_path(self, path: str) -> Optional[str]:
"""경로에 해당하는 Root 찾기"""
abs_path = os.path.abspath(path)
self.logger.debug(f"Root 찾기: {path} -> {abs_path}")
# 가장 구체적인 매치 찾기 (가장 긴 공통 경로)
best_match = None
best_match_length = 0
for root_name, config in self.roots.items():
self.logger.debug(f" 확인: {config.path} 하위인가?")
if abs_path.startswith(config.path):
match_length = len(config.path)
if match_length > best_match_length:
best_match = root_name
best_match_length = match_length
self.logger.debug(f" 새로운 최적 매치: {root_name} (길이: {match_length})")
if best_match:
self.logger.debug(f" 최종 매치: {best_match}")
else:
self.logger.debug(" 매치되는 Root 없음")
return best_match
def _check_file_size_limit(self, filepath: str, root_name: str) -> bool:
"""파일 크기 제한 확인"""
try:
if not os.path.exists(filepath):
return True # 새 파일은 크기 제한 없음
file_size = os.path.getsize(filepath)
max_size = self.roots[root_name].max_file_size
if file_size > max_size:
self.logger.warning(f"파일 크기 제한 초과: {file_size} > {max_size}")
return False
return True
except Exception as e:
self.logger.error(f"파일 크기 확인 실패: {e}")
return False
def cleanup(self):
"""리소스 정리"""
try:
if os.path.exists(self.db_path):
os.remove(self.db_path)
self.logger.info("CompleteFilesystemMCP 정리 완료")
except Exception as e:
self.logger.error(f"리소스 정리 실패: {e}")