Skip to main content
Glama
Skynotdie

MCP Localization Project

by Skynotdie
filesystem_base.py16.6 kB
#!/usr/bin/env python3 """ Filesystem MCP Base - 기본 클래스와 유틸리티 Complete Filesystem MCP의 기본 클래스들과 유틸리티 함수들을 포함합니다. - Enum 클래스들 (PermissionLevel, FileOperation) - 데이터 클래스들 (RootConfig, FileMetadata, AuditEvent) - CompleteFilesystemMCP 초기화 및 내부 유틸리티 """ import os import sys import sqlite3 import json import hashlib import mimetypes import tempfile import shutil import threading import time import logging from datetime import datetime from pathlib import Path from typing import Dict, List, Optional, Any, Tuple, Union from dataclasses import dataclass, asdict from enum import Enum import glob import fnmatch # 로깅 설정 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) class PermissionLevel(Enum): """권한 레벨 정의""" READ_ONLY = "read_only" READ_WRITE = "read_write" FULL_ACCESS = "full_access" RESTRICTED = "restricted" class FileOperation(Enum): """파일 작업 타입""" READ = "read" WRITE = "write" DELETE = "delete" CREATE = "create" MOVE = "move" SEARCH = "search" @dataclass class RootConfig: """Roots 프로토콜 설정""" name: str path: str permission_level: PermissionLevel allowed_operations: List[FileOperation] max_file_size: int = 100 * 1024 * 1024 # 100MB audit_enabled: bool = True def to_dict(self): """딕셔너리로 변환""" return { 'name': self.name, 'path': self.path, 'permission_level': self.permission_level.value, 'allowed_operations': [op.value for op in self.allowed_operations], 'max_file_size': self.max_file_size, 'audit_enabled': self.audit_enabled } @dataclass class FileMetadata: """파일 메타데이터""" path: str name: str size: int modified_time: datetime created_time: datetime mime_type: str permissions: str file_hash: str is_directory: bool is_symlink: bool symlink_target: Optional[str] = None @dataclass class AuditEvent: """감사 이벤트""" timestamp: datetime operation: FileOperation path: str user: str success: bool file_size: Optional[int] = None error_message: Optional[str] = None details: Optional[Dict] = None class FilesystemMCPBase: """파일시스템 MCP 기본 클래스""" def __init__(self, db_path: Optional[str] = None): """초기화""" # 로거 먼저 설정 self.logger = logging.getLogger("CompleteFilesystemMCP") # DB 경로 설정 self.db_path = db_path or tempfile.mktemp(suffix='.db') # Roots 설정 self.roots: Dict[str, RootConfig] = {} # 감사 로그 저장소 self.audit_log: List[AuditEvent] = [] # 스레드 안전성을 위한 락 self.file_lock = threading.RLock() # GPU 환경변수 설정 (CLAUDE.md 지침) self._setup_gpu_environment() # 데이터베이스 초기화 self._init_database() # 기본 루트 추가 self._setup_default_roots() self.logger.info(f"CompleteFilesystemMCP 초기화 완료: DB={self.db_path}") def _setup_gpu_environment(self): """GPU 환경변수 설정 (CLAUDE.md 지침 반영)""" try: os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2' # 경고 숨김 os.environ['TF_FORCE_GPU_ALLOW_GROWTH'] = 'true' # 메모리 증분 할당 os.environ['CUDA_VISIBLE_DEVICES'] = '0' # GPU 0 사용 os.environ['TF_GPU_ALLOCATOR'] = 'cuda_malloc_async' self.logger.debug("GPU 환경변수 설정 완료") except Exception as e: self.logger.warning(f"GPU 환경변수 설정 실패: {e}") def _init_database(self): """데이터베이스 초기화""" try: with self.file_lock: conn = sqlite3.connect(self.db_path) cursor = conn.cursor() # Roots 테이블 cursor.execute(''' CREATE TABLE IF NOT EXISTS roots ( name TEXT PRIMARY KEY, path TEXT NOT NULL, permission_level TEXT NOT NULL, allowed_operations TEXT NOT NULL, config TEXT NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') # 파일 메타데이터 테이블 cursor.execute(''' CREATE TABLE IF NOT EXISTS file_metadata ( path TEXT PRIMARY KEY, name TEXT NOT NULL, size INTEGER NOT NULL, modified_time TIMESTAMP NOT NULL, created_time TIMESTAMP NOT NULL, mime_type TEXT NOT NULL, permissions TEXT NOT NULL, file_hash TEXT NOT NULL, is_directory BOOLEAN NOT NULL, is_symlink BOOLEAN NOT NULL, symlink_target TEXT, last_accessed TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') # 감사 로그 테이블 cursor.execute(''' CREATE TABLE IF NOT EXISTS audit_log ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp TIMESTAMP NOT NULL, operation TEXT NOT NULL, path TEXT NOT NULL, user TEXT NOT NULL, success BOOLEAN NOT NULL, file_size INTEGER, error_message TEXT, details TEXT ) ''') conn.commit() conn.close() self.logger.info("데이터베이스 초기화 완료") except Exception as e: self.logger.error(f"데이터베이스 초기화 실패: {e}") raise def _setup_default_roots(self): """기본 Root 설정""" try: # 현재 작업 디렉토리 current_dir = os.getcwd() default_root = RootConfig( name="current_directory", path=current_dir, permission_level=PermissionLevel.READ_WRITE, allowed_operations=[ FileOperation.READ, FileOperation.WRITE, FileOperation.CREATE, FileOperation.DELETE, FileOperation.MOVE, FileOperation.SEARCH ], max_file_size=50 * 1024 * 1024, # 50MB audit_enabled=True ) self.add_root(default_root) # 홈 디렉토리 (읽기 전용) home_dir = os.path.expanduser("~") if home_dir != current_dir: home_root = RootConfig( name="home_directory", path=home_dir, permission_level=PermissionLevel.READ_ONLY, allowed_operations=[ FileOperation.READ, FileOperation.SEARCH ], max_file_size=10 * 1024 * 1024, # 10MB audit_enabled=True ) self.add_root(home_root) except Exception as e: self.logger.warning(f"기본 Root 설정 실패: {e}") def add_root(self, config: RootConfig) -> bool: """새로운 Root 경로 추가""" try: self.logger.info(f"Root 추가 시도: {config.name} -> {config.path}") # 경로 존재 확인 if not os.path.exists(config.path): self.logger.error(f"경로가 존재하지 않습니다: {config.path}") return False # 절대 경로로 변환 config.path = os.path.abspath(config.path) # 권한 확인 if not os.access(config.path, os.R_OK): self.logger.error(f"읽기 권한이 없습니다: {config.path}") return False with self.file_lock: # 데이터베이스에 저장 conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(''' INSERT OR REPLACE INTO roots (name, path, permission_level, allowed_operations, config) VALUES (?, ?, ?, ?, ?) ''', ( config.name, config.path, config.permission_level.value, json.dumps([op.value for op in config.allowed_operations]), json.dumps(config.to_dict()) )) conn.commit() conn.close() # 메모리에 저장 self.roots[config.name] = config self.logger.info(f"Root '{config.name}' 추가 완료: {config.path}") return True except Exception as e: self.logger.error(f"Root 추가 실패: {e}") return False def _calculate_file_hash(self, filepath: str) -> str: """파일 해시 계산""" try: hasher = hashlib.sha256() with open(filepath, 'rb') as f: for chunk in iter(lambda: f.read(4096), b""): hasher.update(chunk) return hasher.hexdigest() except Exception as e: self.logger.error(f"파일 해시 계산 실패 {filepath}: {e}") return "" def _get_file_metadata(self, filepath: str) -> Optional[FileMetadata]: """파일 메타데이터 수집""" try: path_obj = Path(filepath) if not path_obj.exists(): return None stat_info = path_obj.stat() # 파일 해시 계산 (디렉토리가 아닌 경우만) file_hash = "" if path_obj.is_file(): file_hash = self._calculate_file_hash(filepath) # MIME 타입 감지 mime_type, _ = mimetypes.guess_type(filepath) if not mime_type: mime_type = "application/octet-stream" if path_obj.is_file() else "inode/directory" # 심볼릭 링크 대상 symlink_target = None if path_obj.is_symlink(): try: symlink_target = str(path_obj.readlink()) except: symlink_target = None return FileMetadata( path=filepath, name=path_obj.name, size=stat_info.st_size, modified_time=datetime.fromtimestamp(stat_info.st_mtime), created_time=datetime.fromtimestamp(stat_info.st_ctime), mime_type=mime_type, permissions=oct(stat_info.st_mode)[-3:], file_hash=file_hash, is_directory=path_obj.is_dir(), is_symlink=path_obj.is_symlink(), symlink_target=symlink_target ) except Exception as e: self.logger.error(f"파일 메타데이터 수집 실패 {filepath}: {e}") return None def _log_audit_event(self, operation: FileOperation, path: str, user: str, success: bool, file_size: Optional[int] = None, error_message: Optional[str] = None, details: Optional[Dict] = None): """감사 이벤트 로깅""" try: event = AuditEvent( timestamp=datetime.now(), operation=operation, path=path, user=user, success=success, file_size=file_size, error_message=error_message, details=details ) # 메모리에 저장 (제한된 크기) self.audit_log.append(event) if len(self.audit_log) > 1000: # 최대 1000개 유지 self.audit_log.pop(0) # 데이터베이스에 저장 with self.file_lock: conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(''' INSERT INTO audit_log (timestamp, operation, path, user, success, file_size, error_message, details) VALUES (?, ?, ?, ?, ?, ?, ?, ?) ''', ( event.timestamp, event.operation.value, event.path, event.user, event.success, event.file_size, event.error_message, json.dumps(event.details) if event.details else None )) conn.commit() conn.close() self.logger.debug(f"감사 이벤트 로깅: {operation.value} {path} {'성공' if success else '실패'}") except Exception as e: self.logger.error(f"감사 이벤트 로깅 실패: {e}") def _check_permission(self, root_name: str, operation: FileOperation) -> bool: """권한 확인""" if root_name not in self.roots: self.logger.debug(f"Root '{root_name}' 찾을 수 없음") return False config = self.roots[root_name] has_permission = operation in config.allowed_operations self.logger.debug(f"권한 확인: {root_name} {operation.value} -> {has_permission}") return has_permission def _find_root_for_path(self, path: str) -> Optional[str]: """경로에 해당하는 Root 찾기""" abs_path = os.path.abspath(path) self.logger.debug(f"Root 찾기: {path} -> {abs_path}") # 가장 구체적인 매치 찾기 (가장 긴 공통 경로) best_match = None best_match_length = 0 for root_name, config in self.roots.items(): self.logger.debug(f" 확인: {config.path} 하위인가?") if abs_path.startswith(config.path): match_length = len(config.path) if match_length > best_match_length: best_match = root_name best_match_length = match_length self.logger.debug(f" 새로운 최적 매치: {root_name} (길이: {match_length})") if best_match: self.logger.debug(f" 최종 매치: {best_match}") else: self.logger.debug(" 매치되는 Root 없음") return best_match def _check_file_size_limit(self, filepath: str, root_name: str) -> bool: """파일 크기 제한 확인""" try: if not os.path.exists(filepath): return True # 새 파일은 크기 제한 없음 file_size = os.path.getsize(filepath) max_size = self.roots[root_name].max_file_size if file_size > max_size: self.logger.warning(f"파일 크기 제한 초과: {file_size} > {max_size}") return False return True except Exception as e: self.logger.error(f"파일 크기 확인 실패: {e}") return False def cleanup(self): """리소스 정리""" try: if os.path.exists(self.db_path): os.remove(self.db_path) self.logger.info("CompleteFilesystemMCP 정리 완료") except Exception as e: self.logger.error(f"리소스 정리 실패: {e}")

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Skynotdie/mky'

If you have feedback or need assistance with the MCP directory API, please join our Discord server